incubator-ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject svn commit: r1397417 - in /incubator/ambari/branches/AMBARI-666: ./ ambari-agent/src/main/python/ambari_agent/ ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ ambari-server/src/main/java/org/apache/ambari/server/agent/ ambari-server...
Date Fri, 12 Oct 2012 02:03:57 GMT
Author: jitendra
Date: Fri Oct 12 02:03:57 2012
New Revision: 1397417

URL: http://svn.apache.org/viewvc?rev=1397417&view=rev
Log:
AMBARI-850. Flatten execution command structure.

Modified:
    incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt
    incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java

Modified: incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt (original)
+++ incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt Fri Oct 12 02:03:57 2012
@@ -11,6 +11,8 @@ AMBARI-666 branch (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+
+  AMBARI-850. Flatten ExecutionCommand structure. (jitendra)
   
   AMBARI-848. Various tests for FSM and Controller impl. (hitesh)
 

Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/ActionQueue.py?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/ActionQueue.py Fri Oct 12 02:03:57 2012
@@ -106,22 +106,23 @@ class ActionQueue(threading.Thread):
     clusterName = command['clusterName']
     commandId = command['commandId']
     hostname = command['hostname']
-    params = command['params']
+    params = command['hostLevelParams']
     clusterHostInfo = command['clusterHostInfo']
-    roleCommands = command['roleCommands']
+    roleCommand = command['roleCommand']
+    serviceName = command['serviceName']
     configurations = command['configurations']
     result = []
-    for roleCommand in roleCommands:
-      # assume some puppet pluing to run these commands
-      roleResult = {'role' : roleCommand['role'],
-                    'actionId' : commandId,
-                    'stdout' : "DONE",
-                    'clusterName' : clusterName,
-                    'stderr' : "DONE",
-                    'exitCode' : 0,
-                    'status' : "COMPLETED"}
-      result.append(roleResult)
-      pass
+    # assume some puppet pluing to run these commands
+    roleResult = {'role' : command['role'],
+                  'actionId' : commandId,
+                  'stdout' : "DONE",
+                  'clusterName' : clusterName,
+                  'stderr' : "DONE",
+                  'exitCode' : 0,
+                  'serviceName' : serviceName,
+                  'status' : "COMPLETED"}
+    result.append(roleResult)
+    pass
     return result
 
   def noOpCommand(self, command):

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java Fri Oct 12 02:03:57 2012
@@ -22,7 +22,8 @@ import java.util.List;
 
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.utils.StageUtils;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.mortbay.log.Log;
 
 import com.google.inject.Singleton;
 
@@ -59,9 +60,16 @@ public class ActionDBInMemoryImpl implem
   public synchronized void abortOperation(long requestId) {
     for (Stage s : stageList) {
       if (s.getRequestId() == requestId) {
-        for(String host: s.getHostActions().keySet()) {
-          for (HostRoleCommand role : s.getHostActions().get(host).getRoleCommands()) {
-            role.setStatus(HostRoleStatus.ABORTED);
+        for (String host : s.getHosts()) {
+          for (ExecutionCommand cmd : s.getExecutionCommands(host)) {
+            HostRoleStatus status = s.getHostRoleStatus(host, cmd.getRole()
+                .toString());
+            if (status.equals(HostRoleStatus.IN_PROGRESS)
+                || status.equals(HostRoleStatus.QUEUED)
+                || status.equals(HostRoleStatus.PENDING)) {
+              s.setHostRoleStatus(host, cmd.getRole().toString(),
+                  HostRoleStatus.ABORTED);
+            }
           }
         }
       }
@@ -72,11 +80,7 @@ public class ActionDBInMemoryImpl implem
   public synchronized void timeoutHostRole(String host, long requestId,
       long stageId, Role role) {
     for (Stage s : stageList) {
-      for (HostRoleCommand r : s.getHostActions().get(host).getRoleCommands()) {
-        if (r.getRole().equals(role)) {
-          r.setStatus(HostRoleStatus.TIMEDOUT);
-        }
-      }
+      s.setHostRoleStatus(host, role.toString(), HostRoleStatus.TIMEDOUT);
     }
   }
 
@@ -100,17 +104,18 @@ public class ActionDBInMemoryImpl implem
   @Override
   public synchronized void updateHostRoleState(String hostname, long requestId,
       long stageId, String role, CommandReport report) {
+    Log.info("DEBUG stages to iterate: "+stageList.size());
     for (Stage s : stageList) {
-      for (HostRoleCommand r : s.getHostActions().get(hostname).getRoleCommands()) {
-        if (r.getRole().toString().equals(role)) {
-          r.setStatus(HostRoleStatus.valueOf(report.getStatus()));
-          r.setExitCode(report.getExitCode());
-          r.setStderr(report.getStdErr());
-          r.setStdout(report.getStdOut());
-        }
+      if (s.getRequestId() == requestId && s.getStageId() == stageId) {
+        s.setHostRoleStatus(hostname, role,
+            HostRoleStatus.valueOf(report.getStatus()));
+        s.setExitCode(hostname, role, report.getExitCode());
+        s.setStderr(hostname, role, report.getStdErr());
+        s.setStdout(hostname, role, report.getStdOut());
       }
     }
   }
+  
   @Override
   public void abortHostRole(String host, long requestId, long stageId, Role role) {
     CommandReport report = new CommandReport();

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Fri Oct 12 02:03:57 2012
@@ -17,30 +17,22 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import javax.xml.bind.JAXBException;
-
 import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.Role;
 import org.apache.ambari.server.agent.ActionQueue;
-import org.apache.ambari.server.agent.AgentCommand;
+import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
-import org.apache.ambari.server.state.ServiceComponentImpl;
-import org.apache.ambari.server.state.ServiceImpl;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
-
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostImpl;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import org.apache.ambari.server.utils.StageUtils;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -94,123 +86,173 @@ class ActionScheduler implements Runnabl
       }
     }
   }
-
+  
   private void doWork() throws AmbariException {
-    LOG.info("Scheduler wakes up");
     List<Stage> stages = db.getStagesInProgress();
+    LOG.info("Scheduler wakes up, number of stages to look at:"+stages.size());
     if (stages == null || stages.isEmpty()) {
       //Nothing to do
       LOG.info("No stage in progress..nothing to do");
       return;
     }
-
-    //First discover completions and timeouts.
-    boolean operationFailure = false;
+    
     for (Stage s : stages) {
-      Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = getInvertedRoleMap(s);
-
-      //Iterate for completion
-      boolean moveToNextStage = true;
-      for (Role r: roleToHrcMap.keySet()) {
-        processPendingsAndReschedule(s, roleToHrcMap.get(r));
-        RoleStatus roleStatus = getRoleStatus(roleToHrcMap.get(r), s.getSuccessFactor(r));
-        if (!roleStatus.isRoleSuccessful()) {
-          if (!roleStatus.isRoleInProgress()) {
-            //The role has completely failed
-            //Mark the entire operation as failed
-            operationFailure = true;
-            break;
-          }
-          moveToNextStage = false;
+      List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
+      Map<String, RoleStats> roleStats = processInProgressStage(s, commandsToSchedule);
+      //Check if stage is failed
+      boolean failed = false;
+      for (String role : roleStats.keySet()) {
+        RoleStats stats = roleStats.get(role);
+        LOG.info("Stats for role:"+role+", stats="+stats);
+        if (stats.isRoleFailed()) {
+          failed = true;
+          break;
         }
       }
-      if (operationFailure) {
+      if (failed) {
+        LOG.warn("Operation completely failed, borting request id:"
+            + s.getRequestId());
         db.abortOperation(s.getRequestId());
+        return;
       }
-      if (operationFailure || !moveToNextStage) {
-        break;
+
+      //Schedule what we have so far
+      for (ExecutionCommand cmd : commandsToSchedule) {
+        try {
+          scheduleHostRole(s, cmd);
+        } catch (InvalidStateTransitonException e) {
+          LOG.warn("Could not schedule host role "+cmd.toString(), e);
+          db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
+              cmd.getRole());
+        }
+      }
+      
+      //Check if ready to go to next stage
+      boolean goToNextStage = true;
+      for (String role: roleStats.keySet()) {
+        RoleStats stats = roleStats.get(role);
+        if (!stats.isSuccessFactorMet()) {
+          goToNextStage = false;
+          break;
+        }
+      }
+      if (!goToNextStage) {
+        return;
       }
     }
   }
-
-  private void processPendingsAndReschedule(Stage stage,
-      Map<String, HostRoleCommand> hrcMap) throws AmbariException {
-    LOG.info("Processing pending and queued actions");
-    for (String host : hrcMap.keySet()) {
-      HostRoleCommand hrc = hrcMap.get(host);
-      if ( (hrc.getStatus() != HostRoleStatus.PENDING) &&
-           (hrc.getStatus() != HostRoleStatus.QUEUED) ) {
-        //This task has been executed
-        continue;
-      }
-      long now = System.currentTimeMillis();
-      LOG.info("Last attempt time =" + stage.getLastAttemptTime(host)
-          + ", actiontimeout =" + this.actionTimeout + ", current time=" + now);
-      if (now > stage.getLastAttemptTime(host)+actionTimeout) {
-        LOG.info("Host:"+host+", role:"+hrc.getRole()+", actionId:"+stage.getActionId()+" timed out");
-        if (stage.getAttemptCount(host) >= maxAttempts) {
-          LOG.warn("Host:"+host+", role:"+hrc.getRole()+", actionId:"+stage.getActionId()+" expired");
-          // final expired
-          ServiceComponentHostOpFailedEvent timeoutEvent =
-              new ServiceComponentHostOpFailedEvent(hrc.getRole().toString(),
-                  host, now);
-          try {
-            Cluster c = fsmObject.getCluster(stage.getClusterName());
-            Service svc = c.getService(hrc.getServiceName());
-            ServiceComponent svcComp = svc.getServiceComponent(
-                hrc.getRole().toString());
-            ServiceComponentHost svcCompHost =
-                svcComp.getServiceComponentHost(host);
-            svcCompHost.handleEvent(timeoutEvent);
-          } catch (InvalidStateTransitonException e) {
-            LOG.info("Transition failed for host: "+host+", role: "+hrc.getRole(), e);
-          }
-          db.timeoutHostRole(host, stage.getRequestId(), stage.getStageId(),
-              hrc.getRole());
-        } else {
-          try {
-            scheduleHostRole(stage, host, hrc);
-          } catch (InvalidStateTransitonException ex) {
-            LOG.info("Cannot make this transition..aborting host role", ex);
-            db.abortHostRole(host, stage.getRequestId(), stage.getStageId(),
-                hrc.getRole());
+  
+  /**
+   * @param commandsToSchedule 
+   * @return Stats for the roles in the stage. It is used to determine whether stage 
+   * has succeeded or failed.
+   */
+  private Map<String, RoleStats> processInProgressStage(Stage s,
+      List<ExecutionCommand> commandsToSchedule) {
+    // Map to track role status
+    Map<String, RoleStats> roleStats = new TreeMap<String, RoleStats>();
+    long now = System.currentTimeMillis();
+    for (String host : s.getHosts()) {
+      List<ExecutionCommand> commands = s.getExecutionCommands(host);
+      for(ExecutionCommand c : commands) {
+        String roleStr = c.getRole().toString();
+        RoleStats stats = roleStats.get(roleStr);
+        if (stats == null) {
+          stats = new RoleStats(s.getHosts().size(), 1);
+          roleStats.put(roleStr, stats);
+        }
+        HostRoleStatus status = s.getHostRoleStatus(host, roleStr);    
+        LOG.info("Last attempt time =" + s.getLastAttemptTime(host, roleStr)
+            + ", actiontimeout =" + this.actionTimeout + ", current time="
+            + now);
+        if (timeOutActionNeeded(status, s, host, roleStr, now)) {
+          LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
+              + s.getActionId() + " timed out");
+          if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
+            LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
+                + s.getActionId() + " expired");
+            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(),
+                c.getRole());
+            //Reinitialize status
+            status = s.getHostRoleStatus(host, roleStr);
+            ServiceComponentHostOpFailedEvent timeoutEvent =
+                new ServiceComponentHostOpFailedEvent(roleStr,
+                    host, now);
+            try {
+              Cluster cluster = fsmObject.getCluster(s.getClusterName());
+              Service svc = cluster.getService(c.getServiceName());
+              ServiceComponent svcComp = svc.getServiceComponent(
+                  roleStr);
+              ServiceComponentHost svcCompHost =
+                  svcComp.getServiceComponentHost(host);
+              svcCompHost.handleEvent(timeoutEvent);
+            } catch (InvalidStateTransitonException e) {
+              LOG.info("Transition failed for host: "+host+", role: "+roleStr, e);
+            } catch (AmbariException ex) {
+              LOG.info("Invalid live state", ex);
+            }
+          } else {
+            commandsToSchedule.add(c);
           }
+        } else if (status.equals(HostRoleStatus.PENDING)) {
+          //Need to schedule first time
+          commandsToSchedule.add(c);
         }
+        this.updateRoleStats(status, stats);
       }
     }
+    return roleStats;
+  }
+
+  private boolean timeOutActionNeeded(HostRoleStatus status, Stage stage, 
+      String host, String role, long currentTime) {
+    LOG.info("Last attempt time =" + stage.getLastAttemptTime(host, role)
+        + ", actiontimeout =" + this.actionTimeout + ", current time="
+        + currentTime+", role="+role+", status="+status);
+    
+    if (( !status.equals(HostRoleStatus.QUEUED) ) &&
+        ( ! status.equals(HostRoleStatus.IN_PROGRESS) )) {
+      return false;
+    }
+    if (currentTime > stage.getLastAttemptTime(host, role)+actionTimeout) {
+      return true;
+    }
+    return false;
   }
 
-  private void scheduleHostRole(Stage s, String hostname, HostRoleCommand hrc)
+  private void scheduleHostRole(Stage s, ExecutionCommand cmd)
       throws InvalidStateTransitonException, AmbariException {
     long now = System.currentTimeMillis();
-    LOG.info("Host:" + hostname + ", role:" + hrc.getRole() + ", actionId:"
+    String roleStr = cmd.getRole().toString();
+    String hostname = cmd.getHostname();
+    LOG.info("Host:" + hostname + ", role:" + cmd.getRole() + ", actionId:"
         + s.getActionId() + " being scheduled"+", current time: "+now+", start time: "+
-        s.getStartTime(hostname));
-    if (s.getStartTime(hostname) < 0) {
+        s.getStartTime(hostname, roleStr));
+    if (s.getStartTime(hostname, roleStr) < 0) {
       LOG.info("Update state machine for first attempt");
       try {
         Cluster c = fsmObject.getCluster(s.getClusterName());
-        Service svc = c.getService(hrc.getServiceName());
-        ServiceComponent svcComp = svc.getServiceComponent(hrc.getRole().toString());
+        Service svc = c.getService(cmd.getServiceName());
+        ServiceComponent svcComp = svc.getServiceComponent(roleStr);
         ServiceComponentHost svcCompHost =
             svcComp.getServiceComponentHost(hostname);
-        svcCompHost.handleEvent(hrc.getEvent());
-        s.setStartTime(hostname, now);
+        svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr));
+        s.setStartTime(hostname,roleStr, now);
+        s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
       } catch (InvalidStateTransitonException e) {
         LOG.info(
             "Transition failed for host: " + hostname + ", role: "
-                + hrc.getRole(), e);
+                + roleStr, e);
         throw e;
       } catch (AmbariException e) {
-        LOG.info("Exception in fsm: " + hostname + ", role: " + hrc.getRole(),
+        LOG.info("Exception in fsm: " + hostname + ", role: " + roleStr,
             e);
         throw e;
       }
     }
-    s.setLastAttemptTime(hostname, now);
-    s.incrementAttemptCount(hostname);
+    s.setLastAttemptTime(hostname, roleStr, now);
+    s.incrementAttemptCount(hostname, roleStr);
     LOG.info("Enqueueing in action queue for host: "+hostname);
-    AgentCommand cmd = s.getExecutionCommand(hostname);
     try {
       LOG.info("Command string = " + StageUtils.jaxbToString(cmd));
     } catch (Exception e) {
@@ -219,54 +261,29 @@ class ActionScheduler implements Runnabl
     actionQueue.enqueue(hostname, cmd);
   }
 
-  private RoleStatus getRoleStatus(
-      Map<String, HostRoleCommand> hostRoleCmdForRole, float successFactor) {
-    RoleStatus rs = new RoleStatus(hostRoleCmdForRole.size(), successFactor);
-    for (String h : hostRoleCmdForRole.keySet()) {
-      HostRoleCommand hrc = hostRoleCmdForRole.get(h);
-      switch (hrc.getStatus()) {
-      case COMPLETED:
-        rs.numSucceeded++;
-        break;
-      case FAILED:
-        rs.numFailed++;
-        break;
-      case QUEUED:
-        rs.numQueued++;
-        break;
-      case PENDING:
-        rs.numPending++;
-        break;
-      case TIMEDOUT:
-        rs.numTimedOut++;
-        break;
-      case ABORTED:
-        rs.numAborted++;
-      }
+  private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
+    switch (status) {
+    case COMPLETED:
+      rs.numSucceeded++;
+      break;
+    case FAILED:
+      rs.numFailed++;
+      break;
+    case QUEUED:
+      rs.numQueued++;
+      break;
+    case PENDING:
+      rs.numPending++;
+      break;
+    case TIMEDOUT:
+      rs.numTimedOut++;
+      break;
+    case ABORTED:
+      rs.numAborted++;
     }
-    return rs;
   }
 
-  private Map<Role, Map<String, HostRoleCommand>> getInvertedRoleMap(Stage s) {
-    // Temporary to store role to host
-    Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = new TreeMap<Role, Map<String, HostRoleCommand>>();
-    Map<String, HostAction> hostActions = s.getHostActions();
-    for (String h : hostActions.keySet()) {
-      HostAction ha = hostActions.get(h);
-      List<HostRoleCommand> roleCommands = ha.getRoleCommands();
-      for (HostRoleCommand hrc : roleCommands) {
-        Map<String, HostRoleCommand> hrcMap = roleToHrcMap.get(hrc.getRole());
-        if (hrcMap == null) {
-          hrcMap = new TreeMap<String, HostRoleCommand>();
-          roleToHrcMap.put(hrc.getRole(), hrcMap);
-        }
-        hrcMap.put(h, hrc);
-      }
-    }
-    return roleToHrcMap;
-  }
-
-  static class RoleStatus {
+  static class RoleStats {
     int numQueued = 0;
     int numSucceeded = 0;
     int numFailed = 0;
@@ -276,12 +293,15 @@ class ActionScheduler implements Runnabl
     final int totalHosts;
     final float successFactor;
 
-    RoleStatus(int total, float successFactor) {
+    RoleStats(int total, float successFactor) {
       this.totalHosts = total;
       this.successFactor = successFactor;
     }
 
-    boolean isRoleSuccessful() {
+    /**
+     * Role successful means the role is successful enough to
+     */
+    boolean isSuccessFactorMet() {
       if (successFactor <= (1.0*numSucceeded)/totalHosts) {
         return true;
       } else {
@@ -289,16 +309,33 @@ class ActionScheduler implements Runnabl
       }
     }
 
-    boolean isRoleInProgress() {
+    private boolean isRoleInProgress() {
       return (numPending+numQueued > 0);
     }
 
+    /**
+     * Role failure means role is no longer in progress and success factor is
+     * not met.
+     */
     boolean isRoleFailed() {
-      if ((!isRoleInProgress()) && (!isRoleSuccessful())) {
+      if (isRoleInProgress() || isSuccessFactorMet()) {
         return false;
       } else {
         return true;
       }
     }
+    
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("numQueued="+numQueued);
+      builder.append(", numSucceeded="+numSucceeded);
+      builder.append(", numFailed="+numFailed);
+      builder.append(", numTimedOut="+numTimedOut);
+      builder.append(", numPending="+numPending);
+      builder.append(", numAborted="+numAborted);
+      builder.append(", totalHosts="+totalHosts);
+      builder.append(", successFactor="+successFactor);
+      return builder.toString();
+    }
   }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java Fri Oct 12 02:03:57 2012
@@ -27,20 +27,21 @@ import org.apache.ambari.server.state.Se
  * track the request.
  * For the actual command refer {@link HostAction#commandToHost}
  */
-public class HostRoleCommand {
+class HostRoleCommand {
   private final Role role;
   private HostRoleStatus status = HostRoleStatus.PENDING;
   private String stdout = "";
   private String stderr = "";
   private int exitCode = 999; //Default is unknown
   private final ServiceComponentHostEvent event;
-  private String serviceName;
+  private long startTime = -1;
+  private long lastAttemptTime = -1;
+  private short attemptCount = 0;
 
-  public HostRoleCommand(String serviceName, Role role,
+  public HostRoleCommand(String host, Role role,
       ServiceComponentHostEvent event) {
     this.role = role;
     this.event = event;
-    this.serviceName = serviceName;
   }
 
   public Role getRole() {
@@ -83,7 +84,41 @@ public class HostRoleCommand {
     this.exitCode = exitCode;
   }
 
-  public String getServiceName() {
-    return serviceName;
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getLastAttemptTime() {
+    return lastAttemptTime;
+  }
+
+  public void setLastAttemptTime(long lastAttemptTime) {
+    this.lastAttemptTime = lastAttemptTime;
+  }
+
+  public short getAttemptCount() {
+    return attemptCount;
+  }
+
+  public void incrementAttemptCount() {
+    this.attemptCount++;
+  }
+  
+  @Override
+  public int hashCode() {
+    return role.hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof HostRoleCommand)) {
+      return false;
+    }
+    HostRoleCommand o = (HostRoleCommand) other;
+    return this.role.equals(o.role);
   }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java Fri Oct 12 02:03:57 2012
@@ -24,8 +24,11 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.mortbay.log.Log;
 
 //This class encapsulates the stage. The stage encapsulates all the information
 //required to persist an action.
@@ -33,13 +36,16 @@ public class Stage {
   private final long requestId;
   private final String clusterName;
   private long stageId = -1;
+  private final String logDir;
 
   //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
   private Map<Role, Float> successFactors = new HashMap<Role, Float>();
 
   //Map of host to host-roles
-  private Map<String, HostAction> hostActions = new TreeMap<String, HostAction>();
-  private final String logDir;
+  private Map<String, Map<String, HostRoleCommand>> hostRoleCommands = 
+      new TreeMap<String, Map<String, HostRoleCommand>>();
+  private Map<String, List<ExecutionCommand>> commandsToSend = 
+      new TreeMap<String, List<ExecutionCommand>>();
 
   public Stage(long requestId, String logDir, String clusterName) {
     this.requestId = requestId;
@@ -52,8 +58,10 @@ public class Stage {
       throw new RuntimeException("Attempt to set stageId again! Not allowed.");
     }
     this.stageId = stageId;
-    for (String host: this.hostActions.keySet()) {
-      this.hostActions.get(host).setCommandId(this.requestId, this.stageId);
+    for (String host: this.commandsToSend.keySet()) {
+      for (ExecutionCommand cmd : this.commandsToSend.get(host)) {
+        cmd.setCommandId(StageUtils.getActionId(requestId, stageId));
+      }
     }
   }
 
@@ -65,21 +73,46 @@ public class Stage {
     return StageUtils.getActionId(requestId, stageId);
   }
 
-  public synchronized void addHostAction(String host, HostAction ha) {
-    ha.setCommandId(requestId, stageId);
-    hostActions.put(host, ha);
-  }
-
-  synchronized HostAction getHostAction(String host) {
-    return hostActions.get(host);
-  }
-
   /**
-   * Returns an internal data structure, please don't modify it.
-   * TODO: Ideally should return an iterator.
+   * A new host role command is created for execution.
+   * Creates both ExecutionCommand and HostRoleCommand objects and
+   * adds them to the Stage. This should be called only once for a host-role
+   * for a given stage.
    */
-  synchronized Map<String, HostAction> getHostActions() {
-    return hostActions;
+  public synchronized void addHostRoleExecutionCommand(String host, Role role,  RoleCommand command, 
+      ServiceComponentHostEvent event, String clusterName, String serviceName) {
+    Log.info("Adding host role command for role: "+role+", command: "+command
+        +", event: "+event+", clusterName: "+clusterName+", serviceName: "+serviceName);
+    HostRoleCommand hrc = new HostRoleCommand(host, role, event);
+    ExecutionCommand cmd = new ExecutionCommand();
+    cmd.setHostname(host);
+    cmd.setClusterName(clusterName);
+    cmd.setServiceName(serviceName);
+    cmd.setCommandId(this.getActionId());
+    cmd.setRole(role);
+    cmd.setRoleCommand(command);
+    Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(host);
+    if (hrcMap == null) {
+      hrcMap = new TreeMap<String, HostRoleCommand>();
+      this.hostRoleCommands.put(host, hrcMap);
+    }
+    if (hrcMap.get(role.toString()) != null) {
+      throw new RuntimeException(
+          "Setting the host role command second time for same stage: stage="
+              + this.getActionId() + ", host=" + host + ", role=" + role);
+    }
+    hrcMap.put(role.toString(), hrc);
+    List<ExecutionCommand> execCmdList = this.commandsToSend.get(host);
+    if (execCmdList == null) {
+      execCmdList = new ArrayList<ExecutionCommand>();
+      this.commandsToSend.put(host, execCmdList);
+    }
+    if (execCmdList.contains(cmd)) {
+      throw new RuntimeException(
+          "Setting the execution command second time for same stage: stage="
+              + this.getActionId() + ", host=" + host + ", role=" + role);
+    }
+    execCmdList.add(cmd);
   }
   
   /**
@@ -88,7 +121,7 @@ public class Stage {
    */
   public synchronized List<String> getHosts() {
     List<String> hlist = new ArrayList<String>();
-    for (String h : getHostActions().keySet()) {
+    for (String h : this.hostRoleCommands.keySet()) {
       hlist.add(h);
     }
     return hlist;
@@ -107,49 +140,87 @@ public class Stage {
     return requestId;
   }
 
-  public String getManifest(String hostName) {
-    // TODO Auto-generated method stub
-    return getHostAction(hostName).getManifest();
-  }
-
   public String getClusterName() {
     return clusterName;
   }
 
-  public long getLastAttemptTime(String host) {
-    return getHostAction(host).getLastAttemptTime();
+  public long getLastAttemptTime(String host, String role) {
+    return this.hostRoleCommands.get(host).get(role).getLastAttemptTime();
   }
 
-  public short getAttemptCount(String host) {
-    return getHostAction(host).getAttemptCount();
+  public short getAttemptCount(String host, String role) {
+    return this.hostRoleCommands.get(host).get(role).getAttemptCount();
   }
 
-  public void incrementAttemptCount(String hostname) {
-    getHostAction(hostname).incrementAttemptCount();
+  public void incrementAttemptCount(String hostname, String role) {
+    this.hostRoleCommands.get(hostname).get(role).incrementAttemptCount();
   }
 
-  public void setLastAttemptTime(String hostname, long t) {
-    getHostAction(hostname).setLastAttemptTime(t);
+  public void setLastAttemptTime(String host, String role, long t) {
+    this.hostRoleCommands.get(host).get(role).setLastAttemptTime(t);
   }
 
-  public ExecutionCommand getExecutionCommand(String hostname) {
-    return getHostAction(hostname).getCommandToHost();
+  public ExecutionCommand getExecutionCommand(String hostname, String role) {
+    for (ExecutionCommand execCmd : this.commandsToSend.get(hostname)) {
+      if (role.equals(execCmd.getRole().toString())) {
+        return execCmd;
+      }
+    }
+    return null;
+  }
+  
+  public List<ExecutionCommand> getExecutionCommands(String hostname) {
+    return this.commandsToSend.get(hostname);
   }
 
-  public long getStartTime(String hostname) {
-    return getHostAction(hostname).getStartTime();
+  public long getStartTime(String hostname, String role) {
+    return this.hostRoleCommands.get(hostname).get(role).getStartTime();
   }
   
-  public void setStartTime(String hostname, long startTime) {
-    getHostAction(hostname).setStartTime(startTime);
+  public void setStartTime(String hostname, String role, long startTime) {
+    this.hostRoleCommands.get(hostname).get(role).setStartTime(startTime);
+  }
+  
+  public HostRoleStatus getHostRoleStatus(String hostname, String role) {
+    return this.hostRoleCommands.get(hostname).get(role).getStatus();
+  }
+  
+  public void setHostRoleStatus(String host, String role,
+      HostRoleStatus status) {
+    this.hostRoleCommands.get(host).get(role).setStatus(status);
+  }
+  
+  public ServiceComponentHostEvent getFsmEvent(String hostname, String roleStr) {
+    return this.hostRoleCommands.get(hostname).get(roleStr).getEvent();
+  }
+  
+
+  public void setExitCode(String hostname, String role, int exitCode) {
+    this.hostRoleCommands.get(hostname).get(role).setExitCode(exitCode);
+  }
+  
+  public int getExitCode(String hostname, String role) {
+    return this.hostRoleCommands.get(hostname).get(role).getExitCode();
+  }
+
+  public void setStderr(String hostname, String role, String stdErr) {
+    this.hostRoleCommands.get(hostname).get(role).setStderr(stdErr);
+  }
+
+  public void setStdout(String hostname, String role, String stdOut) {
+    this.hostRoleCommands.get(hostname).get(role).setStdout(stdOut);
   }
   
   public synchronized boolean isStageInProgress() {
-    for(String host: hostActions.keySet()) {
-      for (HostRoleCommand role : hostActions.get(host).getRoleCommands()) {
-        if (role.getStatus().equals(HostRoleStatus.PENDING) ||
-            role.getStatus().equals(HostRoleStatus.QUEUED) || 
-            role.getStatus().equals(HostRoleStatus.IN_PROGRESS)) {
+    for(String host: hostRoleCommands.keySet()) {
+      for (String role : hostRoleCommands.get(host).keySet()) {
+        HostRoleCommand hrc = hostRoleCommands.get(host).get(role);
+        if (hrc == null) {
+          return false;
+        }
+        if (hrc.getStatus().equals(HostRoleStatus.PENDING) ||
+            hrc.getStatus().equals(HostRoleStatus.QUEUED) || 
+            hrc.getStatus().equals(HostRoleStatus.IN_PROGRESS)) {
           return true;
         }
       }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java Fri Oct 12 02:03:57 2012
@@ -17,11 +17,12 @@
  */
 package org.apache.ambari.server.agent;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 
@@ -34,21 +35,17 @@ public class ExecutionCommand extends Ag
   public ExecutionCommand() {
     super(AgentCommandType.EXECUTION_COMMAND);
   }
-
   private String clusterName;
-
   private String commandId;
-
   private String hostname;
-  
-  private Map<String, String> params = new HashMap<String, String>();
-
+  private Role role;
+  private Map<String, String> hostLevelParams = new HashMap<String, String>();
+  private Map<String, String> roleParams = null;
+  private RoleCommand roleCommand;
   private Map<String, List<String>> clusterHostInfo = 
       new HashMap<String, List<String>>();
-
-  private List<RoleExecution> rolesCommands;
   private Map<String, Map<String, String>> configurations;
-
+  private String serviceName;
   
   @JsonProperty("commandId")
   public String getCommandId() {
@@ -60,20 +57,6 @@ public class ExecutionCommand extends Ag
     this.commandId = commandId;
   }
   
-  public synchronized void addRoleCommand(String role, String cmd,
-      Map<String, String> roleParams) {
-    RoleExecution rec = new RoleExecution(role, cmd, roleParams);
-    if (rolesCommands == null) {
-      rolesCommands = new ArrayList<RoleExecution>();
-    }
-    rolesCommands.add(rec);
-  }
-  
-  @JsonProperty("roleCommands")
-  public synchronized List<RoleExecution> getRoleCommands() {
-    return this.rolesCommands;
-  }
-  
   @Override
   public boolean equals(Object other) {
     if (!(other instanceof ExecutionCommand)) {
@@ -81,73 +64,50 @@ public class ExecutionCommand extends Ag
     }
     ExecutionCommand o = (ExecutionCommand) other;
     return (this.commandId == o.commandId &&
-            this.hostname == o.hostname);
+            this.hostname == o.hostname &&
+            this.role == o.role &&
+            this.roleCommand == o.roleCommand);
   }
   
   @Override
   public String toString() {
-    return "Host=" + hostname + ", commandId="+commandId;
+    return "Host=" + hostname + ", commandId=" + commandId + ", role=" + role
+        + ", command=" + roleCommand;
   }
 
   @Override
   public int hashCode() {
-    return (hostname + commandId).hashCode();
+    return (hostname + commandId + role).hashCode();
+  }
+    
+  @JsonProperty("role")
+  public Role getRole() {
+    return role;
   }
 
-  /**
-   * Role Execution commands sent to the 
-   *
-   */
-  public static class RoleExecution {
-
-    private String role;
-
-    private Map<String, String> roleParams = null;
+  @JsonProperty("role")
+  public void setRole(Role role) {
+    this.role = role;
+  }
 
-    private String cmd;
+  @JsonProperty("roleParams")
+  public Map<String, String> getRoleParams() {
+    return roleParams;
+  }
 
-    public RoleExecution() {}
-    
-    public RoleExecution(String role, String cmd,
-        Map<String, String> roleParams) {
-      this.role = role;
-      this.cmd = cmd;
-      this.roleParams = roleParams;
-    }
-    
-    @JsonProperty("role")
-    public String getRole() {
-      return role;
-    }
-    
-    @JsonProperty("role")
-    public void setRole(String role) {
-      this.role = role;
-    }
-    
-    @JsonProperty("roleParams")
-    public Map<String, String> getRoleParams() {
-      return roleParams;
-    }
+  @JsonProperty("roleParams")
+  public void setRoleParams(Map<String, String> roleParams) {
+    this.roleParams = roleParams;
+  }
 
-    @JsonProperty("roleParams")
-    public void setRoleParams(Map<String, String> roleParams) {
-      this.roleParams = roleParams;
-    }
+  @JsonProperty("roleCommand")
+  public RoleCommand getRoleCommand() {
+    return roleCommand;
+  }
 
-    @JsonProperty("cmd")
-    public String getCmd() {
-      return cmd;
-    }
-    
-    @JsonProperty("cmd")
-    public void setCmd(String cmd) {
-      this.cmd = cmd;
-    }
-    
-    public String toString() {
-      return null;
-    }
+  @JsonProperty("roleCommand")
+  public void setRoleCommand(RoleCommand cmd) {
+    this.roleCommand = cmd;
   }
   
   @JsonProperty("clusterName")
@@ -170,14 +130,14 @@ public class ExecutionCommand extends Ag
     this.hostname = hostname;
   }
 
-  @JsonProperty("params")
-  public Map<String, String> getParams() {
-    return params;
+  @JsonProperty("hostLevelParams")
+  public Map<String, String> getHostLevelParams() {
+    return hostLevelParams;
   }
 
-  @JsonProperty("params")
-  public void setParams(Map<String, String> params) {
-    this.params = params;
+  @JsonProperty("hostLevelParams")
+  public void setHostLevelParams(Map<String, String> params) {
+    this.hostLevelParams = params;
   }
 
   @JsonProperty("clusterHostInfo")
@@ -199,4 +159,14 @@ public class ExecutionCommand extends Ag
   public void setConfigurations(Map<String, Map<String, String>> configurations) {
     this.configurations = configurations;
   }
+
+  @JsonProperty("serviceName")
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  @JsonProperty("serviceName")
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java Fri Oct 12 02:03:57 2012
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.HostNotFoundException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.state.AgentVersion;
@@ -98,8 +99,6 @@ public class HeartBeatHandler {
 
     //Examine heartbeat for command reports
     List<CommandReport> reports = heartbeat.getReports();
-    actionManager.actionResponse(hostname, reports);
-    //Update state machines from reports
     for (CommandReport report : reports) {
       String clusterName = report.getClusterName();
       if ((clusterName == null) || "".equals(clusterName)) {
@@ -108,7 +107,7 @@ public class HeartBeatHandler {
       Cluster cl = clusterFsm.getCluster(report.getClusterName());
       String service = report.getServiceName();
       if (service == null || "".equals(service)) {
-        service = "HDFS";
+        throw new AmbariException("Invalid command report, service: "+service);
       }
       Service svc = cl.getService(service);
       ServiceComponent svcComp = svc.getServiceComponent(
@@ -129,6 +128,8 @@ public class HeartBeatHandler {
       }
       LOG.info("Report for "+report.toString() +", processed successfully");
     }
+    //Update state machines from reports
+    actionManager.actionResponse(hostname, reports);
 
     // Examine heartbeart for component status
     Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
@@ -176,10 +177,13 @@ public class HeartBeatHandler {
       throws InvalidStateTransitonException, AmbariException {
     String hostname = register.getHostname();
     long now = System.currentTimeMillis();
-    if (clusterFsm.getHost(hostname) == null) {
+    Host hostObject;
+    try {
+      hostObject = clusterFsm.getHost(hostname);
+    } catch (HostNotFoundException ex) {
       clusterFsm.addHost(hostname);
+      hostObject = clusterFsm.getHost(hostname);
     }
-    Host hostObject = clusterFsm.getHost(hostname);
     List<StatusCommand> cmds = new ArrayList<StatusCommand>();
     for (Cluster cl : clusterFsm.getClustersForHost(hostname)) {
       List<ServiceComponentHost> roleList = cl

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java Fri Oct 12 02:03:57 2012
@@ -24,7 +24,6 @@ import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.HostAction;
-import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.state.Cluster;
@@ -434,25 +433,18 @@ public class AmbariManagementControllerI
     return stage;
   }
 
-  private HostAction createHostAction(Stage stage, ServiceComponentHost scHost,
+  private void createHostAction(Stage stage, ServiceComponentHost scHost,
       Map<String, Config> configs,
       RoleCommand command,
       long nowTimestamp) {
 
-    HostAction ha = new HostAction(scHost.getHostName());
-
-    HostRoleCommand cmd = new HostRoleCommand(scHost.getServiceName(),
-        Role.valueOf(scHost.getServiceComponentName()),
+    stage.addHostRoleExecutionCommand(scHost.getHostName(), Role.valueOf(scHost
+        .getServiceComponentName()), command,
         new ServiceComponentHostInstallEvent(scHost.getServiceComponentName(),
-            scHost.getHostName(), nowTimestamp));
-
-
-    ha.addHostRoleCommand(cmd);
-
-    ExecutionCommand execCmd = ha.getCommandToHost();
-    execCmd.setCommandId(stage.getActionId());
-    execCmd.setClusterName(scHost.getClusterName());
-
+            scHost.getHostName(), nowTimestamp), scHost.getClusterName(),
+        scHost.getServiceName());
+    ExecutionCommand execCmd = stage.getExecutionCommand(scHost.getHostName(),
+        scHost.getServiceComponentName());
     // Generate cluster host info
     // TODO fix - use something from somewhere to generate this at some point
     Map<String, List<String>> clusterHostInfo =
@@ -476,15 +468,14 @@ public class AmbariManagementControllerI
 
     Map<String, String> params = new TreeMap<String, String>();
     params.put("magic_param", "/x/y/z");
-    execCmd.setParams(params);
+    execCmd.setHostLevelParams(params);
 
     Map<String, String> roleParams = new TreeMap<String, String>();
     roleParams.put("magic_role_param", "false");
 
-    execCmd.addRoleCommand(scHost.getServiceComponentName(),
-        command.toString(), roleParams);
+    execCmd.setRoleParams(roleParams);
 
-    return ha;
+    return;
   }
 
   @Override
@@ -841,9 +832,8 @@ public class AmbariManagementControllerI
             throw new AmbariException("Unsupported state change operation");
         }
 
-        HostAction ha = createHostAction(stage, scHost, configs, roleCommand,
+        createHostAction(stage, scHost, configs, roleCommand,
             nowTimestamp);
-        stage.addHostAction(scHost.getHostName(), ha);
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding new stage for updateService request"
@@ -987,9 +977,8 @@ public class AmbariManagementControllerI
           throw new AmbariException("Unsupported state change operation");
       }
 
-      HostAction ha = createHostAction(stage, scHost, configs, roleCommand,
+      createHostAction(stage, scHost, configs, roleCommand,
           nowTimestamp);
-      stage.addHostAction(scHost.getHostName(), ha);
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding new stage for updateComponent request"
@@ -1147,9 +1136,7 @@ public class AmbariManagementControllerI
         throw new AmbariException("Unsupported state change operation");
     }
 
-    HostAction ha = createHostAction(stage, sch, configs, roleCommand,
-        nowTimestamp);
-    stage.addHostAction(sch.getHostName(), ha);
+    createHostAction(stage, sch, configs, roleCommand, nowTimestamp);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding new stage for createHostComponent request"
           + ", clusterName=" + request.getClusterName()

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java Fri Oct 12 02:03:57 2012
@@ -217,9 +217,9 @@ public class AmbariServer {
       manager.start();
       LOG.info("********* Started ActionManager **********");
 
-      //RequestInjectorForTest testInjector = new RequestInjectorForTest(controller, clusters);
-      //Thread testInjectorThread = new Thread(testInjector);
-      //testInjectorThread.start();
+      RequestInjectorForTest testInjector = new RequestInjectorForTest(controller, clusters);
+      Thread testInjectorThread = new Thread(testInjector);
+      testInjectorThread.start();
       
       server.join();
       LOG.info("Joined the Server");

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java Fri Oct 12 02:03:57 2012
@@ -28,9 +28,7 @@ import java.util.TreeMap;
 import javax.xml.bind.JAXBException;
 
 import org.apache.ambari.server.Role;
-import org.apache.ambari.server.actionmanager.HostAction;
-import org.apache.ambari.server.actionmanager.HostRoleCommand;
-import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
@@ -59,24 +57,21 @@ public class StageUtils {
   }
 
   //For testing only
-  public static Stage getATestStage(long requestId, long stageId) {
-    String hostname;
-    try {
-      hostname = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException e) {
-      hostname = "host-dummy";
+  public static Stage getATestStage(long requestId, long stageId, String hostname) {
+    if (hostname == null || "".equals(hostname)) {
+      try {
+        hostname = InetAddress.getLocalHost().getHostName();
+      } catch (UnknownHostException e) {
+        hostname = "host-dummy";
+      }
     }
     Stage s = new Stage(requestId, "/tmp", "cluster1");
     s.setStageId(stageId);
-    HostAction ha = new HostAction(hostname);
     long now = System.currentTimeMillis();
-    HostRoleCommand hrc = new HostRoleCommand("HDFS", Role.NAMENODE, 
-        new ServiceComponentHostInstallEvent("NAMENODE", hostname, now));
-    hrc.setStatus(HostRoleStatus.PENDING);
-    ha.addHostRoleCommand(hrc);
-    ExecutionCommand execCmd = ha.getCommandToHost();
+    s.addHostRoleExecutionCommand(hostname, Role.NAMENODE, RoleCommand.INSTALL,
+        new ServiceComponentHostInstallEvent("NAMENODE", hostname, now), "cluster1", "HDFS");
+    ExecutionCommand execCmd = s.getExecutionCommand(hostname, "NAMENODE");
     execCmd.setCommandId(s.getActionId());
-    execCmd.setClusterName("cluster1");
     Map<String, List<String>> clusterHostInfo = new TreeMap<String, List<String>>();
     List<String> slaveHostList = new ArrayList<String>();
     slaveHostList.add(hostname);
@@ -91,16 +86,15 @@ public class StageUtils {
     execCmd.setConfigurations(configurations);
     Map<String, String> params = new TreeMap<String, String>();
     params.put("jdklocation", "/x/y/z");
-    execCmd.setParams(params);
+    execCmd.setHostLevelParams(params);
     Map<String, String> roleParams = new TreeMap<String, String>();
     roleParams.put("format", "false");
-    execCmd.addRoleCommand("NAMENODE", "INSTALL", roleParams);
+    execCmd.setRoleParams(roleParams);
     try {
       LOG.info("Command string = " + StageUtils.jaxbToString(execCmd));
     } catch (Exception e) {
       throw new RuntimeException("Could not get string from jaxb",e);
     }
-    s.addHostAction(hostname, ha);
     return s;
   }
   

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java Fri Oct 12 02:03:57 2012
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.state.cluster.ClustersImpl;
@@ -50,21 +51,19 @@ public class TestActionManager {
     cr.setExitCode(215);
     reports.add(cr);
     am.actionResponse(hostname, reports);
-    assertEquals(215, am.getAction(requestId, stageId).getHostAction(hostname)
-        .getRoleCommands().get(0).getExitCode());
+    assertEquals(215,
+        am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
     assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
-        .getHostAction(hostname).getRoleCommands().get(0).getStatus());
+        .getHostRoleStatus(hostname, "HBASE_MASTER"));
   }
 
   private void populateActionDB(ActionDBAccessor db, String hostname) {
     Stage s = new Stage(requestId, "/a/b", "cluster1");
     s.setStageId(stageId);
-    HostAction ha = new HostAction(hostname);
-    HostRoleCommand cmd = new HostRoleCommand("HBASE", Role.HBASE_MASTER,
+    s.addHostRoleExecutionCommand(hostname, Role.HBASE_MASTER,
+        RoleCommand.START,
         new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
-            hostname, System.currentTimeMillis()));
-    ha.addHostRoleCommand(cmd);
-    s.addHostAction(hostname, ha);
+            hostname, System.currentTimeMillis()), "cluster1", "HBASE");
     List<Stage> stages = new ArrayList<Stage>();
     stages.add(s);
     db.persistActions(stages);

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java Fri Oct 12 02:03:57 2012
@@ -17,8 +17,13 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -32,7 +37,7 @@ import org.apache.ambari.server.state.Cl
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
+import org.apache.ambari.server.utils.StageUtils;
 import org.junit.Test;
 
 public class TestActionScheduler {
@@ -57,15 +62,9 @@ public class TestActionScheduler {
 
     ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
     List<Stage> stages = new ArrayList<Stage>();
-    Stage s = new Stage(1, "/bogus", "clusterName");
-    s.setStageId(977);
-    stages.add(s);
     String hostname = "ahost.ambari.apache.org";
-    HostAction ha = new HostAction(hostname);
-    HostRoleCommand hrc = new HostRoleCommand("HDFS", Role.DATANODE,
-        new ServiceComponentHostInstallEvent(Role.DATANODE.toString(), hostname, 35678901));
-    ha.addHostRoleCommand(hrc);
-    s.addHostAction(hostname, ha);
+    Stage s = StageUtils.getATestStage(1, 977, hostname);
+    stages.add(s);
     when(db.getStagesInProgress()).thenReturn(stages);
 
     //Keep large number of attempts so that the task is not expired finally
@@ -88,7 +87,7 @@ public class TestActionScheduler {
     assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId());
 
     //Now change the action status
-    hrc.setStatus(HostRoleStatus.COMPLETED);
+    s.setHostRoleStatus(hostname, "NAMENODE", HostRoleStatus.COMPLETED);
     ac = aq.dequeueAll(hostname);
 
     //Wait for sometime, it shouldn't be scheduled this time.
@@ -115,16 +114,10 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
     ActionDBAccessorImpl db = mock(ActionDBAccessorImpl.class);
+    String hostname = "ahost.ambari.apache.org";
     List<Stage> stages = new ArrayList<Stage>();
-    Stage s = new Stage(1, "/bogus", "clusterName");
-    s.setStageId(977);
+    Stage s = StageUtils.getATestStage(1, 977, hostname);
     stages.add(s);
-    String hostname = "ahost.ambari.apache.org";
-    HostAction ha = new HostAction(hostname);
-    HostRoleCommand hrc = new HostRoleCommand("HDFS", Role.DATANODE,
-        null);
-    ha.addHostRoleCommand(hrc);
-    s.addHostAction(hostname, ha);
     when(db.getStagesInProgress()).thenReturn(stages);
 
     //Keep large number of attempts so that the task is not expired finally
@@ -136,6 +129,6 @@ public class TestActionScheduler {
     Thread.sleep(500);
     //TODO timeoutHostRole must be called exactly once but in this case the state
     //in the db continues to be pending therefore it is processed multiple times.
-    verify(db, atLeastOnce()).timeoutHostRole(hostname, 1, 977, Role.DATANODE);
+    verify(db, atLeastOnce()).timeoutHostRole(hostname, 1, 977, Role.NAMENODE);
   }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java Fri Oct 12 02:03:57 2012
@@ -412,7 +412,7 @@ public class AmbariManagementControllerT
             + ", stageId=" + stage.getStageId()
             + ", actionId=" + stage.getActionId()
             + ", commandDetails="
-            + StageUtils.jaxbToString(stage.getExecutionCommand(host)));
+            + StageUtils.jaxbToString(stage.getExecutionCommands(host).get(0)));
       }
     }
 
@@ -446,7 +446,7 @@ public class AmbariManagementControllerT
             + ", stageId=" + stage.getStageId()
             + ", actionId=" + stage.getActionId()
             + ", commandDetails="
-            + StageUtils.jaxbToString(stage.getExecutionCommand(host)));
+            + StageUtils.jaxbToString(stage.getExecutionCommands(host).get(0)));
       }
     }
 

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java?rev=1397417&r1=1397416&r2=1397417&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java Fri Oct 12 02:03:57 2012
@@ -19,6 +19,8 @@ package org.apache.ambari.server.utils;
 
 import static org.junit.Assert.*;
 
+import java.util.List;
+
 import javax.xml.bind.JAXBException;
 
 import org.apache.ambari.server.actionmanager.Stage;
@@ -33,23 +35,25 @@ public class TestStageUtils {
   
   @Test
   public void testGetATestStage() {
-    Stage s = StageUtils.getATestStage(1, 2);
+    Stage s = StageUtils.getATestStage(1, 2, "host2");
     String hostname = s.getHosts().get(0);
-    ExecutionCommand cmd = s.getExecutionCommand(hostname);
-    assertEquals("cluster1", cmd.getClusterName());
-    assertEquals(StageUtils.getActionId(1, 2), cmd.getCommandId());
-    assertEquals(hostname, cmd.getHostname());
+    List<ExecutionCommand> cmds = s.getExecutionCommands(hostname);
+    for (ExecutionCommand cmd : cmds) {
+      assertEquals("cluster1", cmd.getClusterName());
+      assertEquals(StageUtils.getActionId(1, 2), cmd.getCommandId());
+      assertEquals(hostname, cmd.getHostname());
+    }
   }
   
   @Test
   public void testJaxbToString() throws Exception {
-    Stage s = StageUtils.getATestStage(1, 2);
+    Stage s = StageUtils.getATestStage(1, 2, "host1");
     String hostname = s.getHosts().get(0);
-    ExecutionCommand cmd = s.getExecutionCommand(hostname);
-    LOG.info("Command is " + StageUtils.jaxbToString(cmd.getRoleCommands()));
+    List<ExecutionCommand> cmds = s.getExecutionCommands(hostname);
+    for (ExecutionCommand cmd: cmds) {
+      LOG.info("Command is " + StageUtils.jaxbToString(cmd));
+    }
     assertEquals(StageUtils.getActionId(1, 2), s.getActionId());
-    String jaxbString = StageUtils.jaxbToString(cmd);
-    LOG.info("String = "+jaxbString);
   }
 
 }



Mime
View raw message