ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject ambari git commit: AMBARI-9334 - Ambari StageDAO.findByCommandStatuses causes Postgress HIGH CPU (jonathanhurley)
Date Wed, 04 Mar 2015 21:43:04 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-1.7.0 3271d1d1f -> 7566e570a


AMBARI-9334 - Ambari StageDAO.findByCommandStatuses causes Postgress HIGH CPU (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7566e570
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7566e570
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7566e570

Branch: refs/heads/branch-1.7.0
Commit: 7566e570aff4622e4eee7023325583244564a3af
Parents: 3271d1d
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Wed Mar 4 16:06:02 2015 -0500
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Wed Mar 4 16:06:02 2015 -0500

----------------------------------------------------------------------
 .../server/actionmanager/ActionDBAccessor.java  |  33 ++--
 .../actionmanager/ActionDBAccessorImpl.java     |  85 ++++++----
 .../server/actionmanager/ActionManager.java     |  29 ++--
 .../server/actionmanager/ActionScheduler.java   |  47 ++++--
 .../server/actionmanager/HostRoleStatus.java    |   7 +
 .../server/orm/dao/HostRoleCommandDAO.java      |  66 ++++++--
 .../apache/ambari/server/orm/dao/StageDAO.java  |  42 ++---
 .../orm/entities/HostRoleCommandEntity.java     |  87 ++++++++---
 .../ambari/server/orm/entities/StageEntity.java |  66 ++++++--
 .../actionmanager/TestActionDBAccessorImpl.java | 156 ++++++++++++++++---
 .../actionmanager/TestActionScheduler.java      | 100 ++++++------
 11 files changed, 497 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 1f99b4a..725fa69 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -17,15 +17,15 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import com.google.inject.persist.Transactional;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.inject.persist.Transactional;
 
 public interface ActionDBAccessor {
 
@@ -57,12 +57,26 @@ public interface ActionDBAccessor {
   public void timeoutHostRole(String host, long requestId, long stageId, String role);
 
   /**
-   * Returns all the pending stages, including queued and not-queued.
-   * A stage is considered in progress if it is in progress for any host.
+   * Returns all the pending stages, including queued and not-queued. A stage is
+   * considered in progress if it is in progress for any host.
+   * <p/>
+   * The results will be sorted by request ID and then stage ID making this call
+   * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in
+   * order to determine if there are stages that are in progress before getting
+   * the stages from this method.
+   *
+   * @see HostRoleStatus#IN_PROGRESS_STATUSES
    */
   public List<Stage> getStagesInProgress();
 
   /**
+   * Gets the number of commands in progress.
+   *
+   * @return the number of commands in progress.
+   */
+  public int getCommandsInProgressCount();
+
+  /**
    * Persists all tasks for a given request
    * @param request request object
    */
@@ -149,11 +163,6 @@ public interface ActionDBAccessor {
   public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
 
   /**
-   * Get all stages that contain tasks with specified host role statuses
-   */
-  public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);
-
-  /**
    * Gets the host role command corresponding to the task id
    */
   public HostRoleCommand getTask(long taskId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 5e879cc..220fb95 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -17,12 +17,18 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-import com.google.inject.persist.Transactional;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
@@ -48,49 +54,55 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.google.inject.persist.Transactional;
 
 @Singleton
 public class ActionDBAccessorImpl implements ActionDBAccessor {
   private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
+
   private long requestId;
+
   @Inject
   ClusterDAO clusterDAO;
+
   @Inject
   HostDAO hostDAO;
+
   @Inject
   RequestDAO requestDAO;
+
   @Inject
   StageDAO stageDAO;
+
   @Inject
   HostRoleCommandDAO hostRoleCommandDAO;
+
   @Inject
   ExecutionCommandDAO executionCommandDAO;
+
   @Inject
   RoleSuccessCriteriaDAO roleSuccessCriteriaDAO;
+
   @Inject
   StageFactory stageFactory;
+
   @Inject
   RequestFactory requestFactory;
+
   @Inject
   HostRoleCommandFactory hostRoleCommandFactory;
+
   @Inject
   Clusters clusters;
+
   @Inject
   RequestScheduleDAO requestScheduleDAO;
 
-
-
   private Cache<Long, HostRoleCommand> hostRoleCommandCache;
   private long cacheLimit; //may be exceeded to store tasks from one request
 
@@ -186,21 +198,34 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     endRequestIfCompleted(requestId);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getPendingStages()
+  /**
+   * {@inheritDoc}
    */
   @Override
   public List<Stage> getStagesInProgress() {
     List<Stage> stages = new ArrayList<Stage>();
-    List<HostRoleStatus> statuses =
-        Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS,
-          HostRoleStatus.PENDING);
-    for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
+    List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(HostRoleStatus.IN_PROGRESS_STATUSES);
+
+    for (StageEntity stageEntity : stageEntities) {
       stages.add(stageFactory.createExisting(stageEntity));
     }
+
     return stages;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int getCommandsInProgressCount() {
+    Number count = hostRoleCommandDAO.getCountByStatus(HostRoleStatus.IN_PROGRESS_STATUSES);
+    if (null == count) {
+      return 0;
+    }
+
+    return count.intValue();
+  }
+
   @Override
   @Transactional
   public void persistActions(Request request) throws AmbariException {
@@ -212,7 +237,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (clusterEntity != null) {
       clusterId = clusterEntity.getClusterId();
     }
-    
+
     requestEntity.setClusterId(clusterId);
     requestDAO.create(requestEntity);
 
@@ -550,14 +575,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   @Override
-  public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
-    List<Stage> stages = new ArrayList<Stage>();
-    for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
-      stages.add(stageFactory.createExisting(stageEntity));
-    }
-    return stages;
-  }
-
   public HostRoleCommand getTask(long taskId) {
     HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int) taskId);
     if (commandEntity == null) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index e2fad5f..fb0b3aa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -17,14 +17,16 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-import com.google.inject.persist.UnitOfWork;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.api.services.BaseRequest;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.HostsMap;
@@ -34,13 +36,10 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.google.inject.persist.UnitOfWork;
 
 
 /**
@@ -62,7 +61,7 @@ public class ActionManager {
                        ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
                        ServerActionManager serverActionManager, UnitOfWork unitOfWork,
                        RequestFactory requestFactory, Configuration configuration) {
-    this.actionQueue = aq;
+    actionQueue = aq;
     this.db = db;
     scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
         actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration);
@@ -204,10 +203,6 @@ public class ActionManager {
     return db.getTasks(taskIds);
   }
 
-  public List<Stage> getRequestsByHostRoleStatus(Set<HostRoleStatus> statuses) {
-    return db.getStagesByHostRoleStatus(statuses);
-  }
-
   /**
    * Get first or last maxResults requests that are in the specified status
    *

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 81fee75..e043d79 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -128,22 +128,22 @@ class ActionScheduler implements Runnable {
       ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
       int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager,
       UnitOfWork unitOfWork, Configuration configuration) {
-    this.sleepTime = sleepTimeMilliSec;
+    sleepTime = sleepTimeMilliSec;
     this.hostsMap = hostsMap;
-    this.actionTimeout = actionTimeoutMilliSec;
+    actionTimeout = actionTimeoutMilliSec;
     this.db = db;
     this.actionQueue = actionQueue;
     this.fsmObject = fsmObject;
     this.maxAttempts = (short) maxAttempts;
     this.serverActionManager = serverActionManager;
     this.unitOfWork = unitOfWork;
-    this.clusterHostInfoCache = CacheBuilder.newBuilder().
+    clusterHostInfoCache = CacheBuilder.newBuilder().
         expireAfterAccess(5, TimeUnit.MINUTES).
         build();
-    this.commandParamsStageCache = CacheBuilder.newBuilder().
+    commandParamsStageCache = CacheBuilder.newBuilder().
       expireAfterAccess(5, TimeUnit.MINUTES).
       build();
-    this.hostParamsStageCache = CacheBuilder.newBuilder().
+    hostParamsStageCache = CacheBuilder.newBuilder().
       expireAfterAccess(5, TimeUnit.MINUTES).
       build();
     this.configuration = configuration;
@@ -202,6 +202,18 @@ class ActionScheduler implements Runnable {
       // The first thing to do is to abort requests that are cancelled
       processCancelledRequestsList();
 
+      // !!! getting the stages in progress could be a very expensive call due
+      // to the join being used; there's no need to make it if there are
+      // no commands in progress
+      if (db.getCommandsInProgressCount() == 0) {
+        // Nothing to do
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("There are no stages currently in progress.");
+        }
+
+        return;
+      }
+
       Set<Long> runningRequestIds = new HashSet<Long>();
       List<Stage> stages = db.getStagesInProgress();
       if (LOG.isDebugEnabled()) {
@@ -209,14 +221,14 @@ class ActionScheduler implements Runnable {
         LOG.debug("Processing {} in progress stages ", stages.size());
       }
       if (stages.isEmpty()) {
-        //Nothing to do
+        // Nothing to do
         if (LOG.isDebugEnabled()) {
-          LOG.debug("No stage in progress..nothing to do");
+          LOG.debug("There are no stages currently in progress.");
         }
         return;
       }
       int i_stage = 0;
-      
+
       stages = filterParallelPerHostStages(stages);
 
       boolean exclusiveRequestIsGoing = false;
@@ -420,7 +432,7 @@ class ActionScheduler implements Runnable {
       s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
       db.hostRoleScheduled(s, hostName, roleName);
       String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
-      this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
+      serverActionManager.executeAction(actionName, cmd.getCommandParams());
       reportServerActionSuccess(s, cmd);
 
     } catch (AmbariException e) {
@@ -569,12 +581,12 @@ class ActionScheduler implements Runnable {
 
         // Check that service host component is not deleted
         if (hostDeleted) {
-          
+
           String message = String.format(
             "Host not found when trying to schedule an execution command. " +
             "The most probable reason for that is that host or host component " +
             "has been deleted recently. The command has been aborted and dequeued." +
-            "Execution command details: " + 
+            "Execution command details: " +
             "cmdId: %s; taskId: %s; roleCommand: %s",
             c.getCommandId(), c.getTaskId(), c.getRoleCommand());
           LOG.warn("Host {} has been detected as non-available. {}", host, message);
@@ -609,7 +621,7 @@ class ActionScheduler implements Runnable {
           LOG.trace("===>commandsToSchedule(first_time)=" + commandsToSchedule.size());
         }
 
-        this.updateRoleStats(status, roleStats.get(roleStr));
+        updateRoleStats(status, roleStats.get(roleStr));
       }
     }
     LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size());
@@ -772,7 +784,7 @@ class ActionScheduler implements Runnable {
     }
 
     cmd.setClusterHostInfo(clusterHostInfo);
- 
+
     //Try to get commandParams from cache and merge them with command-level parameters
     Map<String, String> commandParams = commandParamsStageCache.getIfPresent(stagePk);
 
@@ -888,10 +900,10 @@ class ActionScheduler implements Runnable {
       LOG.error("Unknown status " + status.name());
     }
   }
-  
-  
+
+
   public void setTaskTimeoutAdjustment(boolean val) {
-    this.taskTimeoutAdjustment = val;
+    taskTimeoutAdjustment = val;
   }
 
   static class RoleStats {
@@ -906,7 +918,7 @@ class ActionScheduler implements Runnable {
     final float successFactor;
 
     RoleStats(int total, float successFactor) {
-      this.totalHosts = total;
+      totalHosts = total;
       this.successFactor = successFactor;
     }
 
@@ -938,6 +950,7 @@ class ActionScheduler implements Runnable {
       }
     }
 
+    @Override
     public String toString() {
       StringBuilder builder = new StringBuilder();
       builder.append("numQueued="+numQueued);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
index 447aead..b0ebe83 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.actionmanager;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 
 public enum HostRoleStatus {
@@ -34,6 +35,12 @@ public enum HostRoleStatus {
   private static List<HostRoleStatus> COMPLETED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED, COMPLETED);
   private static List<HostRoleStatus> FAILED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED);
 
+  /**
+   * The {@link HostRoleStatus}s that represent any commands which are
+   * considered to be "In Progress".
+   */
+  public static final EnumSet<HostRoleStatus> IN_PROGRESS_STATUSES = EnumSet.of(
+      HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING);
 
   private HostRoleStatus(int status) {
     this.status = status;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 6920a9e..dce8961 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -18,26 +18,30 @@
 
 package org.apache.ambari.server.orm.dao;
 
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.actionmanager.HostRoleStatus;
-import org.apache.ambari.server.orm.RequiresSession;
-import org.apache.ambari.server.orm.entities.HostEntity;
-import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
-import org.apache.ambari.server.orm.entities.StageEntity;
-import javax.persistence.EntityManager;
-import javax.persistence.TypedQuery;
+import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
+import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
-import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT;
+
+import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
+
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.orm.RequiresSession;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
 
 @Singleton
 public class HostRoleCommandDAO {
@@ -194,6 +198,40 @@ public class HostRoleCommandDAO {
     return daoUtils.selectList(query, requestId);
   }
 
+  /**
+   * Gets the commands in a particular status.
+   *
+   * @param statuses
+   *          the statuses to include (not {@code null}).
+   * @return the commands in the given set of statuses.
+   */
+  @RequiresSession
+  public List<HostRoleCommandEntity> findByStatus(
+      Collection<HostRoleStatus> statuses) {
+    TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
+        "HostRoleCommandEntity.findByCommandStatuses",
+        HostRoleCommandEntity.class);
+
+    query.setParameter("statuses", statuses);
+    return daoUtils.selectList(query);
+  }
+
+  /**
+   * Gets the number of commands in a particular status.
+   *
+   * @param statuses
+   *          the statuses to include (not {@code null}).
+   * @return the count of commands in the given set of statuses.
+   */
+  @RequiresSession
+  public Number getCountByStatus(Collection<HostRoleStatus> statuses) {
+    TypedQuery<Number> query = entityManagerProvider.get().createNamedQuery(
+        "HostRoleCommandEntity.findCountByCommandStatuses", Number.class);
+
+    query.setParameter("statuses", statuses);
+    return daoUtils.selectSingle(query);
+  }
+
   @RequiresSession
   public List<HostRoleCommandEntity> findAll() {
     return daoUtils.selectAll(entityManagerProvider.get(), HostRoleCommandEntity.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 900dbeb..621ff1c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -18,21 +18,24 @@
 
 package org.apache.ambari.server.orm.dao;
 
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
+
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.orm.entities.StageEntityPK;
 import org.apache.ambari.server.utils.StageUtils;
-import javax.persistence.EntityManager;
-import javax.persistence.TypedQuery;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Collection;
-import java.util.Map;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
 
 @Singleton
 public class StageDAO {
@@ -82,12 +85,13 @@ public class StageDAO {
   }
 
   @RequiresSession
-  public List<StageEntity> findByCommandStatuses(Collection<HostRoleStatus> statuses) {
-    TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
-          "FROM StageEntity stage WHERE stage.stageId IN (SELECT hrce.stageId FROM " +
-          "HostRoleCommandEntity hrce WHERE stage.requestId = hrce.requestId and hrce.status IN ?1 ) " +
-          "ORDER BY stage.requestId, stage.stageId", StageEntity.class);
-    return daoUtils.selectList(query, statuses);
+  public List<StageEntity> findByCommandStatuses(
+      Collection<HostRoleStatus> statuses) {
+    TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
+        "StageEntity.findByCommandStatuses", StageEntity.class);
+
+    query.setParameter("statuses", statuses);
+    return daoUtils.selectList(query);
   }
 
   @RequiresSession
@@ -114,10 +118,12 @@ public class StageDAO {
       "SELECT stage.requestContext " + "FROM StageEntity stage " +
         "WHERE stage.requestId=?1", String.class);
     String result =  daoUtils.selectOne(query, requestId);
-    if (result != null)
+    if (result != null) {
       return result;
-    else
+    }
+    else {
       return ""; // Since it is defined as empty string in the StageEntity
+    }
   }
 
   @Transactional

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 599156a..375d895 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -36,6 +36,8 @@ import javax.persistence.JoinColumn;
 import javax.persistence.JoinColumns;
 import javax.persistence.Lob;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
@@ -45,15 +47,17 @@ import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.commons.lang.ArrayUtils;
 
-@Table(name = "host_role_command")
 @Entity
+@Table(name = "host_role_command")
 @TableGenerator(name = "host_role_command_id_generator",
     table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value"
     , pkColumnValue = "host_role_command_id_seq"
     , initialValue = 1
     , allocationSize = 50
 )
-
+@NamedQueries({
+    @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"),
+    @NamedQuery(name = "HostRoleCommandEntity.findByCommandStatuses", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.status IN :statuses ORDER BY command.requestId, command.stageId") })
 public class HostRoleCommandEntity {
 
   private static int MAX_COMMAND_DETAIL_LENGTH = 250;
@@ -190,7 +194,7 @@ public class HostRoleCommandEntity {
   }
 
   public Role getRole() {
-    return Role.valueOf(this.role);
+    return Role.valueOf(role);
   }
 
   public void setRole(Role role) {
@@ -317,29 +321,66 @@ public class HostRoleCommandEntity {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
     HostRoleCommandEntity that = (HostRoleCommandEntity) o;
 
-    if (attemptCount != null ? !attemptCount.equals(that.attemptCount) : that.attemptCount != null) return false;
-    if (event != null ? !event.equals(that.event) : that.event != null) return false;
-    if (exitcode != null ? !exitcode.equals(that.exitcode) : that.exitcode != null) return false;
-    if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false;
-    if (lastAttemptTime != null ? !lastAttemptTime.equals(that.lastAttemptTime) : that.lastAttemptTime != null)
+    if (attemptCount != null ? !attemptCount.equals(that.attemptCount) : that.attemptCount != null) {
+      return false;
+    }
+    if (event != null ? !event.equals(that.event) : that.event != null) {
+      return false;
+    }
+    if (exitcode != null ? !exitcode.equals(that.exitcode) : that.exitcode != null) {
+      return false;
+    }
+    if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) {
+      return false;
+    }
+    if (lastAttemptTime != null ? !lastAttemptTime.equals(that.lastAttemptTime) : that.lastAttemptTime != null) {
+      return false;
+    }
+    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) {
+      return false;
+    }
+    if (role != null ? !role.equals(that.role) : that.role != null) {
+      return false;
+    }
+    if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) {
       return false;
-    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false;
-    if (role != null ? !role.equals(that.role) : that.role != null) return false;
-    if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false;
-    if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false;
-    if (status != null ? !status.equals(that.status) : that.status != null) return false;
-    if (stdError != null ? !Arrays.equals(stdError, that.stdError) : that.stdError != null) return false;
-    if (stdOut != null ? !Arrays.equals(stdOut, that.stdOut) : that.stdOut != null) return false;
-    if (outputLog != null ? !outputLog.equals(that.outputLog) : that.outputLog != null) return false;
-    if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) return false;
-    if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) return false;
-    if (structuredOut != null ? !Arrays.equals(structuredOut, that.structuredOut) : that.structuredOut != null) return false;
-    if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false;
+    }
+    if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) {
+      return false;
+    }
+    if (status != null ? !status.equals(that.status) : that.status != null) {
+      return false;
+    }
+    if (stdError != null ? !Arrays.equals(stdError, that.stdError) : that.stdError != null) {
+      return false;
+    }
+    if (stdOut != null ? !Arrays.equals(stdOut, that.stdOut) : that.stdOut != null) {
+      return false;
+    }
+    if (outputLog != null ? !outputLog.equals(that.outputLog) : that.outputLog != null) {
+      return false;
+    }
+    if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) {
+      return false;
+    }
+    if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) {
+      return false;
+    }
+    if (structuredOut != null ? !Arrays.equals(structuredOut, that.structuredOut) : that.structuredOut != null) {
+      return false;
+    }
+    if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) {
+      return false;
+    }
 
     return true;
   }
@@ -371,7 +412,7 @@ public class HostRoleCommandEntity {
   }
 
   public void setExecutionCommand(ExecutionCommandEntity executionCommandsByTaskId) {
-    this.executionCommand = executionCommandsByTaskId;
+    executionCommand = executionCommandsByTaskId;
   }
 
   public StageEntity getStage() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index a7bc948..e87e28b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -18,14 +18,28 @@
 
 package org.apache.ambari.server.orm.entities;
 
-import javax.persistence.*;
+import static org.apache.commons.lang.StringUtils.defaultString;
+
 import java.util.Collection;
 
-import static org.apache.commons.lang.StringUtils.defaultString;
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.IdClass;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
 
-@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
-@Table(name = "stage")
 @Entity
+@Table(name = "stage")
+@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
+@NamedQueries({ @NamedQuery(name = "StageEntity.findByCommandStatuses", query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId") })
 public class StageEntity {
 
   @Column(name = "cluster_id", updatable = false, nullable = false)
@@ -47,11 +61,11 @@ public class StageEntity {
   @Column(name = "request_context")
   @Basic
   private String requestContext = "";
-  
+
   @Column(name = "cluster_host_info")
   @Basic
   private byte[] clusterHostInfo;
- 
+
   @Column(name = "command_params")
   @Basic
   private byte[] commandParamsStage;
@@ -63,7 +77,7 @@ public class StageEntity {
   @ManyToOne
   @JoinColumn(name = "request_id", referencedColumnName = "request_id", nullable = false)
   private RequestEntity request;
-  
+
 
   @OneToMany(mappedBy = "stage", cascade = CascadeType.REMOVE, fetch = FetchType.LAZY)
   private Collection<HostRoleCommandEntity> hostRoleCommands;
@@ -114,7 +128,7 @@ public class StageEntity {
   public void setClusterHostInfo(String clusterHostInfo) {
     this.clusterHostInfo = clusterHostInfo.getBytes();
   }
- 
+
   public String getCommandParamsStage() {
     return commandParamsStage == null ? new String() : new String(commandParamsStage);
   }
@@ -139,18 +153,36 @@ public class StageEntity {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
     StageEntity that = (StageEntity) o;
 
-    if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) return false;
-    if (logInfo != null ? !logInfo.equals(that.logInfo) : that.logInfo != null) return false;
-    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false;
-    if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false;
-    if (clusterHostInfo != null ? !clusterHostInfo.equals(that.clusterHostInfo) : that.clusterHostInfo != null) return false;
-    if (commandParamsStage != null ? !commandParamsStage.equals(that.commandParamsStage) : that.commandParamsStage != null) return false;
-    if (hostParamsStage != null ? !hostParamsStage.equals(that.hostParamsStage) : that.hostParamsStage != null) return false;
+    if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) {
+      return false;
+    }
+    if (logInfo != null ? !logInfo.equals(that.logInfo) : that.logInfo != null) {
+      return false;
+    }
+    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) {
+      return false;
+    }
+    if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) {
+      return false;
+    }
+    if (clusterHostInfo != null ? !clusterHostInfo.equals(that.clusterHostInfo) : that.clusterHostInfo != null) {
+      return false;
+    }
+    if (commandParamsStage != null ? !commandParamsStage.equals(that.commandParamsStage) : that.commandParamsStage != null) {
+      return false;
+    }
+    if (hostParamsStage != null ? !hostParamsStage.equals(that.hostParamsStage) : that.hostParamsStage != null) {
+      return false;
+    }
     return !(requestContext != null ? !requestContext.equals(that.requestContext) : that.requestContext != null);
 
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 36acbc2..d751f2d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -17,15 +17,19 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.persist.PersistService;
-import com.google.inject.persist.UnitOfWork;
-import com.google.inject.util.Modules;
+import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.persistence.EntityManager;
+
 import junit.framework.Assert;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
@@ -40,6 +44,7 @@ import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.DBAccessorImpl;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.DaoUtils;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
@@ -51,13 +56,16 @@ import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.UnitOfWork;
+import com.google.inject.util.Modules;
 
 public class TestActionDBAccessorImpl {
   private static final Logger log = LoggerFactory.getLogger(TestActionDBAccessorImpl.class);
@@ -73,11 +81,19 @@ public class TestActionDBAccessorImpl {
 
   @Inject
   private Clusters clusters;
+
   @Inject
   private ExecutionCommandDAO executionCommandDAO;
+
   @Inject
   private HostRoleCommandDAO hostRoleCommandDAO;
 
+  @Inject
+  private Provider<EntityManager> entityManagerProvider;
+
+  @Inject
+  private DaoUtils daoUtils;
+
   @Before
   public void setup() throws AmbariException {
     InMemoryDefaultTestModule defaultTestModule = new InMemoryDefaultTestModule();
@@ -157,30 +173,106 @@ public class TestActionDBAccessorImpl {
             "(command report status should be ignored)",
             HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
   }
-  
+
   @Test
   public void testGetStagesInProgress() throws AmbariException {
-    String hostname = "host1";
     List<Stage> stages = new ArrayList<Stage>();
-    stages.add(createStubStage(hostname, requestId, stageId));
-    stages.add(createStubStage(hostname, requestId, stageId + 1));
+    stages.add(createStubStage(hostName, requestId, stageId));
+    stages.add(createStubStage(hostName, requestId, stageId + 1));
     Request request = new Request(stages, clusters);
     db.persistActions(request);
     assertEquals(2, stages.size());
   }
-  
+
   @Test
   public void testGetStagesInProgressWithFailures() throws AmbariException {
-    String hostname = "host1";
-    populateActionDB(db, hostname, requestId, stageId);
-    populateActionDB(db, hostname, requestId+1, stageId);
-    db.abortOperation(requestId);
+    populateActionDB(db, hostName, requestId, stageId);
+    populateActionDB(db, hostName, requestId + 1, stageId);
     List<Stage> stages = db.getStagesInProgress();
+    assertEquals(2, stages.size());
+
+    db.abortOperation(requestId);
+    stages = db.getStagesInProgress();
     assertEquals(1, stages.size());
     assertEquals(requestId+1, stages.get(0).getRequestId());
   }
 
   @Test
+  public void testGetStagesInProgressWithManyStages() throws AmbariException {
+    // create 3 request; each request will have 3 stages, each stage 2 commands
+    populateActionDBMultipleStages(3, db, hostName, requestId, stageId);
+    populateActionDBMultipleStages(3, db, hostName, requestId + 1, stageId + 3);
+    populateActionDBMultipleStages(3, db, hostName, requestId + 2, stageId + 3);
+
+    // verify stages and proper ordering
+    int commandsInProgressCount = db.getCommandsInProgressCount();
+    List<Stage> stages = db.getStagesInProgress();
+    assertEquals(18, commandsInProgressCount);
+    assertEquals(9, stages.size());
+
+    long lastRequestId = Integer.MIN_VALUE;
+    for (Stage stage : stages) {
+      assertTrue(stage.getRequestId() >= lastRequestId);
+      lastRequestId = stage.getRequestId();
+    }
+
+    // cancel the first one, removing 3 stages
+    db.abortOperation(requestId);
+
+    // verify stages and proper ordering
+    commandsInProgressCount = db.getCommandsInProgressCount();
+    stages = db.getStagesInProgress();
+    assertEquals(12, commandsInProgressCount);
+    assertEquals(6, stages.size());
+
+    // find the first stage, and change one command to COMPLETED
+    stages.get(0).setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(),
+        HostRoleStatus.COMPLETED);
+
+    db.hostRoleScheduled(stages.get(0), hostName, Role.HBASE_MASTER.toString());
+
+    // the first stage still has at least 1 command IN_PROGRESS
+    commandsInProgressCount = db.getCommandsInProgressCount();
+    stages = db.getStagesInProgress();
+    assertEquals(11, commandsInProgressCount);
+    assertEquals(6, stages.size());
+
+    // find the first stage, and change the other command to COMPLETED
+    stages.get(0).setHostRoleStatus(hostName,
+        Role.HBASE_REGIONSERVER.toString(), HostRoleStatus.COMPLETED);
+
+    db.hostRoleScheduled(stages.get(0), hostName,
+        Role.HBASE_REGIONSERVER.toString());
+
+    // verify stages and proper ordering
+    commandsInProgressCount = db.getCommandsInProgressCount();
+    stages = db.getStagesInProgress();
+    assertEquals(10, commandsInProgressCount);
+    assertEquals(5, stages.size());
+  }
+
+  @Test
+  public void testGetStagesInProgressWithManyCommands() throws AmbariException {
+    // 1000 hosts
+    for (int i = 0; i < 1000; i++) {
+      String hostName = "c64-" + i;
+      clusters.addHost(hostName);
+      clusters.getHost(hostName).persist();
+    }
+
+    // create 1 request, 3 stages per host, each with 2 commands
+    for (int i = 0; i < 1000; i++) {
+      String hostName = "c64-" + i;
+      populateActionDBMultipleStages(3, db, hostName, requestId + i, stageId);
+    }
+
+    int commandsInProgressCount = db.getCommandsInProgressCount();
+    List<Stage> stages = db.getStagesInProgress();
+    assertEquals(6000, commandsInProgressCount);
+    assertEquals(3000, stages.size());
+  }
+
+  @Test
   public void testPersistActions() throws AmbariException {
     populateActionDB(db, hostName, requestId, stageId);
     for (Stage stage : db.getAllStages(requestId)) {
@@ -310,7 +402,7 @@ public class TestActionDBAccessorImpl {
     populateActionDB(db, hostName, requestId + 1, stageId);
     List<Long> requestIdsResult =
       db.getRequestsByStatus(null, BaseRequest.DEFAULT_PAGE_SIZE, false);
-    
+
     assertNotNull("List of request IDs is null", requestIdsResult);
     assertEquals("Request IDs not matches", requestIds, requestIdsResult);
   }
@@ -488,6 +580,20 @@ public class TestActionDBAccessorImpl {
     db.persistActions(request);
   }
 
+  private void populateActionDBMultipleStages(int numberOfStages,
+      ActionDBAccessor db, String hostname, long requestId, long stageId)
+      throws AmbariException {
+
+    List<Stage> stages = new ArrayList<Stage>();
+    for (int i = 0; i < numberOfStages; i++) {
+      Stage stage = createStubStage(hostname, requestId, stageId + i);
+      stages.add(stage);
+    }
+
+    Request request = new Request(stages, clusters);
+    db.persistActions(request);
+  }
+
   private Stage createStubStage(String hostname, long requestId, long stageId) {
     Stage s = new Stage(requestId, "/a/b", "cluster1", 1L, "action db accessor test",
       "clusterHostInfo", "commandParamsStage", "hostParamsStage");

http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 7224924..e1db9f8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -17,13 +17,21 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollectionOf;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,9 +44,8 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.reflect.TypeToken;
-import com.google.inject.persist.UnitOfWork;
 import junit.framework.Assert;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
@@ -69,7 +76,6 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEve
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -77,6 +83,9 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.reflect.TypeToken;
+import com.google.inject.persist.UnitOfWork;
+
 public class TestActionScheduler {
 
   private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class);
@@ -96,7 +105,7 @@ public class TestActionScheduler {
    */
   @Test
   public void testActionSchedule() throws Exception {
-    
+
     Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
     Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
 
@@ -116,7 +125,7 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    
+
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
@@ -132,6 +141,7 @@ public class TestActionScheduler {
     Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
       "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
     stages.add(s);
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
 
     Request request = mock(Request.class);
@@ -167,7 +177,7 @@ public class TestActionScheduler {
       int expectedQueueSize, ActionScheduler scheduler) {
     int cycleCount = 0;
     while (cycleCount++ <= MAX_CYCLE_ITERATIONS) {
-      List<AgentCommand> ac = aq.dequeueAll(hostname);      
+      List<AgentCommand> ac = aq.dequeueAll(hostname);
       if (ac != null) {
         if (ac.size() == expectedQueueSize) {
           return ac;
@@ -220,6 +230,7 @@ public class TestActionScheduler {
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
@@ -292,7 +303,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     doAnswer(new Answer() {
       @Override
@@ -379,7 +390,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
 
     doAnswer(new Answer() {
@@ -508,7 +519,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     doAnswer(new Answer() {
       @Override
@@ -572,7 +583,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     doAnswer(new Answer() {
       @Override
@@ -680,7 +691,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
 
     Properties properties = new Properties();
@@ -764,7 +775,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
 
     Properties properties = new Properties();
@@ -805,7 +816,7 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    
+
     String hostname1 = "ahost.ambari.apache.org";
     String hostname2 = "bhost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
@@ -813,48 +824,48 @@ public class TestActionScheduler {
     hosts.put(hostname1, sch);
     hosts.put(hostname2, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
-    
+
     List<Stage> stages = new ArrayList<Stage>();
     Stage backgroundStage = null;
     stages.add(//stage with background command
         backgroundStage = getStageWithSingleTask(
             hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, "REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1));
-    
+
     Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND ,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType());
-    
+
     stages.add( // Stage with the same hostname, should be scheduled
         getStageWithSingleTask(
             hostname1, "cluster1", Role.GANGLIA_MONITOR,
             RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
-    
+
     stages.add(
         getStageWithSingleTask(
             hostname2, "cluster1", Role.DATANODE,
             RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
-    
-    
+
+
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
-    
+
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), new ServerActionManagerImpl(fsm),
         unitOfWork, conf);
-    
+
     ActionManager am = new ActionManager(
         2, 2, aq, fsm, db, new HostsMap((String) null),
         new ServerActionManagerImpl(fsm), unitOfWork,
         requestFactory, conf);
-    
+
     scheduler.doWork();
-    
+
     Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
     Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
 
@@ -901,7 +912,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     doAnswer(new Answer() {
       @Override
@@ -1082,7 +1093,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     doAnswer(new Answer() {
       @Override
@@ -1259,7 +1270,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     doAnswer(new Answer() {
       @Override
@@ -1419,7 +1430,7 @@ public class TestActionScheduler {
     assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE)));
     assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.GANGLIA_SERVER)));
   }
-  
+
   @Test
   public void testSuccessCriteria() {
     RoleStats rs1 = new RoleStats(1, (float)0.5);
@@ -1427,37 +1438,37 @@ public class TestActionScheduler {
     assertTrue(rs1.isSuccessFactorMet());
     rs1.numSucceeded = 0;
     assertFalse(rs1.isSuccessFactorMet());
-    
+
     RoleStats rs2 = new RoleStats(2, (float)0.5);
     rs2.numSucceeded = 1;
     assertTrue(rs2.isSuccessFactorMet());
-    
+
     RoleStats rs3 = new RoleStats(3, (float)0.5);
     rs3.numSucceeded = 2;
     assertTrue(rs2.isSuccessFactorMet());
     rs3.numSucceeded = 1;
     assertFalse(rs3.isSuccessFactorMet());
-    
+
     RoleStats rs4 = new RoleStats(3, (float)1.0);
     rs4.numSucceeded = 2;
     assertFalse(rs3.isSuccessFactorMet());
   }
-  
+
   /**
    * This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly.
    */
   @Test
   public void testClusterHostInfoCache() throws Exception {
-    
+
     Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
-    
+
     //Data for stages
     Map<String, Set<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
     Map<String, Set<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
     int stageId = 1;
     int requestId1 = 1;
     int requestId2 = 2;
-    
+
     ActionQueue aq = new ActionQueue();
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
@@ -1492,6 +1503,7 @@ public class TestActionScheduler {
       "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
     Stage s2 = StageUtils.getATestStage(requestId2, stageId, hostname, CLUSTER_HOST_INFO_UPDATED,
       "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
+    when(db.getCommandsInProgressCount()).thenReturn(1);
     when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s1));
 
     //Keep large number of attempts so that the task is not expired finally
@@ -1504,12 +1516,12 @@ public class TestActionScheduler {
 
     assertTrue(ac.get(0) instanceof ExecutionCommand);
     assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
-    
+
     assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
-    
 
+    when(db.getCommandsInProgressCount()).thenReturn(1);
     when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
-    
+
     //Verify that ActionSheduler does not return cached value of cluster host info for new requestId
     ac = waitForQueueSize(hostname, aq, 1, scheduler);
     assertTrue(ac.get(0) instanceof ExecutionCommand);
@@ -1572,7 +1584,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
@@ -1655,7 +1667,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
     doAnswer(new Answer() {
       @Override
@@ -1728,7 +1740,7 @@ public class TestActionScheduler {
     Request request = mock(Request.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequest(anyLong())).thenReturn(request);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
 
     List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
@@ -1894,7 +1906,7 @@ public class TestActionScheduler {
     when(host3.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
-
+    when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
     when(db.getStagesInProgress()).thenReturn(stagesInProgress);
 
     List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();


Mime
View raw message