Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AA1E910AFD for ; Wed, 4 Mar 2015 21:43:26 +0000 (UTC) Received: (qmail 32904 invoked by uid 500); 4 Mar 2015 21:43:04 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 32878 invoked by uid 500); 4 Mar 2015 21:43:04 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 32869 invoked by uid 99); 4 Mar 2015 21:43:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 21:43:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6DA68E03B1; Wed, 4 Mar 2015 21:43:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jonathanhurley@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-9334 - Ambari StageDAO.findByCommandStatuses causes Postgress HIGH CPU (jonathanhurley) Date: Wed, 4 Mar 2015 21:43:04 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/branch-1.7.0 3271d1d1f -> 7566e570a AMBARI-9334 - Ambari StageDAO.findByCommandStatuses causes Postgress HIGH CPU (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7566e570 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7566e570 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7566e570 Branch: refs/heads/branch-1.7.0 Commit: 7566e570aff4622e4eee7023325583244564a3af Parents: 3271d1d Author: Jonathan Hurley Authored: Wed Mar 4 16:06:02 2015 -0500 Committer: Jonathan Hurley Committed: Wed Mar 4 16:06:02 2015 -0500 ---------------------------------------------------------------------- .../server/actionmanager/ActionDBAccessor.java | 33 ++-- .../actionmanager/ActionDBAccessorImpl.java | 85 ++++++---- .../server/actionmanager/ActionManager.java | 29 ++-- .../server/actionmanager/ActionScheduler.java | 47 ++++-- .../server/actionmanager/HostRoleStatus.java | 7 + .../server/orm/dao/HostRoleCommandDAO.java | 66 ++++++-- .../apache/ambari/server/orm/dao/StageDAO.java | 42 ++--- .../orm/entities/HostRoleCommandEntity.java | 87 ++++++++--- .../ambari/server/orm/entities/StageEntity.java | 66 ++++++-- .../actionmanager/TestActionDBAccessorImpl.java | 156 ++++++++++++++++--- .../actionmanager/TestActionScheduler.java | 100 ++++++------ 11 files changed, 497 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java index 1f99b4a..725fa69 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java @@ -17,15 +17,15 @@ */ package org.apache.ambari.server.actionmanager; -import com.google.inject.persist.Transactional; +import java.util.Collection; +import java.util.List; +import java.util.Map; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.google.inject.persist.Transactional; public interface ActionDBAccessor { @@ -57,12 +57,26 @@ public interface ActionDBAccessor { public void timeoutHostRole(String host, long requestId, long stageId, String role); /** - * Returns all the pending stages, including queued and not-queued. - * A stage is considered in progress if it is in progress for any host. + * Returns all the pending stages, including queued and not-queued. A stage is + * considered in progress if it is in progress for any host. + *

+ * The results will be sorted by request ID and then stage ID making this call + * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in + * order to determine if there are stages that are in progress before getting + * the stages from this method. + * + * @see HostRoleStatus#IN_PROGRESS_STATUSES */ public List getStagesInProgress(); /** + * Gets the number of commands in progress. + * + * @return the number of commands in progress. + */ + public int getCommandsInProgressCount(); + + /** * Persists all tasks for a given request * @param request request object */ @@ -149,11 +163,6 @@ public interface ActionDBAccessor { public Collection getTasks(Collection taskIds); /** - * Get all stages that contain tasks with specified host role statuses - */ - public List getStagesByHostRoleStatus(Set statuses); - - /** * Gets the host role command corresponding to the task id */ public HostRoleCommand getTask(long taskId); http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 5e879cc..220fb95 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -17,12 +17,18 @@ */ package org.apache.ambari.server.actionmanager; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.google.inject.name.Named; -import com.google.inject.persist.Transactional; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; @@ -48,49 +54,55 @@ import org.apache.ambari.server.utils.StageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.google.inject.name.Named; +import com.google.inject.persist.Transactional; @Singleton public class ActionDBAccessorImpl implements ActionDBAccessor { private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class); + private long requestId; + @Inject ClusterDAO clusterDAO; + @Inject HostDAO hostDAO; + @Inject RequestDAO requestDAO; + @Inject StageDAO stageDAO; + @Inject HostRoleCommandDAO hostRoleCommandDAO; + @Inject ExecutionCommandDAO executionCommandDAO; + @Inject RoleSuccessCriteriaDAO roleSuccessCriteriaDAO; + @Inject StageFactory stageFactory; + @Inject RequestFactory requestFactory; + @Inject HostRoleCommandFactory hostRoleCommandFactory; + @Inject Clusters clusters; + @Inject RequestScheduleDAO requestScheduleDAO; - - private Cache hostRoleCommandCache; private long cacheLimit; //may be exceeded to store tasks from one request @@ -186,21 +198,34 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { endRequestIfCompleted(requestId); } - /* (non-Javadoc) - * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getPendingStages() + /** + * {@inheritDoc} */ @Override public List getStagesInProgress() { List stages = new ArrayList(); - List statuses = - Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, - HostRoleStatus.PENDING); - for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) { + List stageEntities = stageDAO.findByCommandStatuses(HostRoleStatus.IN_PROGRESS_STATUSES); + + for (StageEntity stageEntity : stageEntities) { stages.add(stageFactory.createExisting(stageEntity)); } + return stages; } + /** + * {@inheritDoc} + */ + @Override + public int getCommandsInProgressCount() { + Number count = hostRoleCommandDAO.getCountByStatus(HostRoleStatus.IN_PROGRESS_STATUSES); + if (null == count) { + return 0; + } + + return count.intValue(); + } + @Override @Transactional public void persistActions(Request request) throws AmbariException { @@ -212,7 +237,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { if (clusterEntity != null) { clusterId = clusterEntity.getClusterId(); } - + requestEntity.setClusterId(clusterId); requestDAO.create(requestEntity); @@ -550,14 +575,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } @Override - public List getStagesByHostRoleStatus(Set statuses) { - List stages = new ArrayList(); - for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) { - stages.add(stageFactory.createExisting(stageEntity)); - } - return stages; - } - public HostRoleCommand getTask(long taskId) { HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int) taskId); if (commandEntity == null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java index e2fad5f..fb0b3aa 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java @@ -17,14 +17,16 @@ */ package org.apache.ambari.server.actionmanager; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.google.inject.name.Named; -import com.google.inject.persist.UnitOfWork; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.ActionQueue; import org.apache.ambari.server.agent.CommandReport; -import org.apache.ambari.server.api.services.BaseRequest; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.ExecuteActionRequest; import org.apache.ambari.server.controller.HostsMap; @@ -34,13 +36,10 @@ import org.apache.ambari.server.utils.StageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.google.inject.name.Named; +import com.google.inject.persist.UnitOfWork; /** @@ -62,7 +61,7 @@ public class ActionManager { ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap, ServerActionManager serverActionManager, UnitOfWork unitOfWork, RequestFactory requestFactory, Configuration configuration) { - this.actionQueue = aq; + actionQueue = aq; this.db = db; scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db, actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration); @@ -204,10 +203,6 @@ public class ActionManager { return db.getTasks(taskIds); } - public List getRequestsByHostRoleStatus(Set statuses) { - return db.getStagesByHostRoleStatus(statuses); - } - /** * Get first or last maxResults requests that are in the specified status * http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 81fee75..e043d79 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -128,22 +128,22 @@ class ActionScheduler implements Runnable { ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager, UnitOfWork unitOfWork, Configuration configuration) { - this.sleepTime = sleepTimeMilliSec; + sleepTime = sleepTimeMilliSec; this.hostsMap = hostsMap; - this.actionTimeout = actionTimeoutMilliSec; + actionTimeout = actionTimeoutMilliSec; this.db = db; this.actionQueue = actionQueue; this.fsmObject = fsmObject; this.maxAttempts = (short) maxAttempts; this.serverActionManager = serverActionManager; this.unitOfWork = unitOfWork; - this.clusterHostInfoCache = CacheBuilder.newBuilder(). + clusterHostInfoCache = CacheBuilder.newBuilder(). expireAfterAccess(5, TimeUnit.MINUTES). build(); - this.commandParamsStageCache = CacheBuilder.newBuilder(). + commandParamsStageCache = CacheBuilder.newBuilder(). expireAfterAccess(5, TimeUnit.MINUTES). build(); - this.hostParamsStageCache = CacheBuilder.newBuilder(). + hostParamsStageCache = CacheBuilder.newBuilder(). expireAfterAccess(5, TimeUnit.MINUTES). build(); this.configuration = configuration; @@ -202,6 +202,18 @@ class ActionScheduler implements Runnable { // The first thing to do is to abort requests that are cancelled processCancelledRequestsList(); + // !!! getting the stages in progress could be a very expensive call due + // to the join being used; there's no need to make it if there are + // no commands in progress + if (db.getCommandsInProgressCount() == 0) { + // Nothing to do + if (LOG.isDebugEnabled()) { + LOG.debug("There are no stages currently in progress."); + } + + return; + } + Set runningRequestIds = new HashSet(); List stages = db.getStagesInProgress(); if (LOG.isDebugEnabled()) { @@ -209,14 +221,14 @@ class ActionScheduler implements Runnable { LOG.debug("Processing {} in progress stages ", stages.size()); } if (stages.isEmpty()) { - //Nothing to do + // Nothing to do if (LOG.isDebugEnabled()) { - LOG.debug("No stage in progress..nothing to do"); + LOG.debug("There are no stages currently in progress."); } return; } int i_stage = 0; - + stages = filterParallelPerHostStages(stages); boolean exclusiveRequestIsGoing = false; @@ -420,7 +432,7 @@ class ActionScheduler implements Runnable { s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED); db.hostRoleScheduled(s, hostName, roleName); String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME); - this.serverActionManager.executeAction(actionName, cmd.getCommandParams()); + serverActionManager.executeAction(actionName, cmd.getCommandParams()); reportServerActionSuccess(s, cmd); } catch (AmbariException e) { @@ -569,12 +581,12 @@ class ActionScheduler implements Runnable { // Check that service host component is not deleted if (hostDeleted) { - + String message = String.format( "Host not found when trying to schedule an execution command. " + "The most probable reason for that is that host or host component " + "has been deleted recently. The command has been aborted and dequeued." + - "Execution command details: " + + "Execution command details: " + "cmdId: %s; taskId: %s; roleCommand: %s", c.getCommandId(), c.getTaskId(), c.getRoleCommand()); LOG.warn("Host {} has been detected as non-available. {}", host, message); @@ -609,7 +621,7 @@ class ActionScheduler implements Runnable { LOG.trace("===>commandsToSchedule(first_time)=" + commandsToSchedule.size()); } - this.updateRoleStats(status, roleStats.get(roleStr)); + updateRoleStats(status, roleStats.get(roleStr)); } } LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size()); @@ -772,7 +784,7 @@ class ActionScheduler implements Runnable { } cmd.setClusterHostInfo(clusterHostInfo); - + //Try to get commandParams from cache and merge them with command-level parameters Map commandParams = commandParamsStageCache.getIfPresent(stagePk); @@ -888,10 +900,10 @@ class ActionScheduler implements Runnable { LOG.error("Unknown status " + status.name()); } } - - + + public void setTaskTimeoutAdjustment(boolean val) { - this.taskTimeoutAdjustment = val; + taskTimeoutAdjustment = val; } static class RoleStats { @@ -906,7 +918,7 @@ class ActionScheduler implements Runnable { final float successFactor; RoleStats(int total, float successFactor) { - this.totalHosts = total; + totalHosts = total; this.successFactor = successFactor; } @@ -938,6 +950,7 @@ class ActionScheduler implements Runnable { } } + @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("numQueued="+numQueued); http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java index 447aead..b0ebe83 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.actionmanager; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.List; public enum HostRoleStatus { @@ -34,6 +35,12 @@ public enum HostRoleStatus { private static List COMPLETED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED, COMPLETED); private static List FAILED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED); + /** + * The {@link HostRoleStatus}s that represent any commands which are + * considered to be "In Progress". + */ + public static final EnumSet IN_PROGRESS_STATUSES = EnumSet.of( + HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING); private HostRoleStatus(int status) { this.status = status; http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index 6920a9e..dce8961 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -18,26 +18,30 @@ package org.apache.ambari.server.orm.dao; -import com.google.common.collect.Lists; -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.inject.Singleton; -import com.google.inject.persist.Transactional; -import org.apache.ambari.server.actionmanager.HostRoleStatus; -import org.apache.ambari.server.orm.RequiresSession; -import org.apache.ambari.server.orm.entities.HostEntity; -import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; -import org.apache.ambari.server.orm.entities.StageEntity; -import javax.persistence.EntityManager; -import javax.persistence.TypedQuery; +import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE; +import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE; -import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT; + +import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; + +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.orm.RequiresSession; +import org.apache.ambari.server.orm.entities.HostEntity; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; +import org.apache.ambari.server.orm.entities.StageEntity; + +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import com.google.inject.persist.Transactional; @Singleton public class HostRoleCommandDAO { @@ -194,6 +198,40 @@ public class HostRoleCommandDAO { return daoUtils.selectList(query, requestId); } + /** + * Gets the commands in a particular status. + * + * @param statuses + * the statuses to include (not {@code null}). + * @return the commands in the given set of statuses. + */ + @RequiresSession + public List findByStatus( + Collection statuses) { + TypedQuery query = entityManagerProvider.get().createNamedQuery( + "HostRoleCommandEntity.findByCommandStatuses", + HostRoleCommandEntity.class); + + query.setParameter("statuses", statuses); + return daoUtils.selectList(query); + } + + /** + * Gets the number of commands in a particular status. + * + * @param statuses + * the statuses to include (not {@code null}). + * @return the count of commands in the given set of statuses. + */ + @RequiresSession + public Number getCountByStatus(Collection statuses) { + TypedQuery query = entityManagerProvider.get().createNamedQuery( + "HostRoleCommandEntity.findCountByCommandStatuses", Number.class); + + query.setParameter("statuses", statuses); + return daoUtils.selectSingle(query); + } + @RequiresSession public List findAll() { return daoUtils.selectAll(entityManagerProvider.get(), HostRoleCommandEntity.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java index 900dbeb..621ff1c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java @@ -18,21 +18,24 @@ package org.apache.ambari.server.orm.dao; -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.inject.Singleton; -import com.google.inject.persist.Transactional; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; + import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.StageEntityPK; import org.apache.ambari.server.utils.StageUtils; -import javax.persistence.EntityManager; -import javax.persistence.TypedQuery; -import java.util.HashMap; -import java.util.List; -import java.util.Collection; -import java.util.Map; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import com.google.inject.persist.Transactional; @Singleton public class StageDAO { @@ -82,12 +85,13 @@ public class StageDAO { } @RequiresSession - public List findByCommandStatuses(Collection statuses) { - TypedQuery query = entityManagerProvider.get().createQuery("SELECT stage " + - "FROM StageEntity stage WHERE stage.stageId IN (SELECT hrce.stageId FROM " + - "HostRoleCommandEntity hrce WHERE stage.requestId = hrce.requestId and hrce.status IN ?1 ) " + - "ORDER BY stage.requestId, stage.stageId", StageEntity.class); - return daoUtils.selectList(query, statuses); + public List findByCommandStatuses( + Collection statuses) { + TypedQuery query = entityManagerProvider.get().createNamedQuery( + "StageEntity.findByCommandStatuses", StageEntity.class); + + query.setParameter("statuses", statuses); + return daoUtils.selectList(query); } @RequiresSession @@ -114,10 +118,12 @@ public class StageDAO { "SELECT stage.requestContext " + "FROM StageEntity stage " + "WHERE stage.requestId=?1", String.class); String result = daoUtils.selectOne(query, requestId); - if (result != null) + if (result != null) { return result; - else + } + else { return ""; // Since it is defined as empty string in the StageEntity + } } @Transactional http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java index 599156a..375d895 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java @@ -36,6 +36,8 @@ import javax.persistence.JoinColumn; import javax.persistence.JoinColumns; import javax.persistence.Lob; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToOne; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -45,15 +47,17 @@ import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.commons.lang.ArrayUtils; -@Table(name = "host_role_command") @Entity +@Table(name = "host_role_command") @TableGenerator(name = "host_role_command_id_generator", table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value" , pkColumnValue = "host_role_command_id_seq" , initialValue = 1 , allocationSize = 50 ) - +@NamedQueries({ + @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"), + @NamedQuery(name = "HostRoleCommandEntity.findByCommandStatuses", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.status IN :statuses ORDER BY command.requestId, command.stageId") }) public class HostRoleCommandEntity { private static int MAX_COMMAND_DETAIL_LENGTH = 250; @@ -190,7 +194,7 @@ public class HostRoleCommandEntity { } public Role getRole() { - return Role.valueOf(this.role); + return Role.valueOf(role); } public void setRole(Role role) { @@ -317,29 +321,66 @@ public class HostRoleCommandEntity { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } HostRoleCommandEntity that = (HostRoleCommandEntity) o; - if (attemptCount != null ? !attemptCount.equals(that.attemptCount) : that.attemptCount != null) return false; - if (event != null ? !event.equals(that.event) : that.event != null) return false; - if (exitcode != null ? !exitcode.equals(that.exitcode) : that.exitcode != null) return false; - if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false; - if (lastAttemptTime != null ? !lastAttemptTime.equals(that.lastAttemptTime) : that.lastAttemptTime != null) + if (attemptCount != null ? !attemptCount.equals(that.attemptCount) : that.attemptCount != null) { + return false; + } + if (event != null ? !event.equals(that.event) : that.event != null) { + return false; + } + if (exitcode != null ? !exitcode.equals(that.exitcode) : that.exitcode != null) { + return false; + } + if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) { + return false; + } + if (lastAttemptTime != null ? !lastAttemptTime.equals(that.lastAttemptTime) : that.lastAttemptTime != null) { + return false; + } + if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) { + return false; + } + if (role != null ? !role.equals(that.role) : that.role != null) { + return false; + } + if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) { return false; - if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false; - if (role != null ? !role.equals(that.role) : that.role != null) return false; - if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false; - if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false; - if (status != null ? !status.equals(that.status) : that.status != null) return false; - if (stdError != null ? !Arrays.equals(stdError, that.stdError) : that.stdError != null) return false; - if (stdOut != null ? !Arrays.equals(stdOut, that.stdOut) : that.stdOut != null) return false; - if (outputLog != null ? !outputLog.equals(that.outputLog) : that.outputLog != null) return false; - if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) return false; - if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) return false; - if (structuredOut != null ? !Arrays.equals(structuredOut, that.structuredOut) : that.structuredOut != null) return false; - if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false; + } + if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) { + return false; + } + if (status != null ? !status.equals(that.status) : that.status != null) { + return false; + } + if (stdError != null ? !Arrays.equals(stdError, that.stdError) : that.stdError != null) { + return false; + } + if (stdOut != null ? !Arrays.equals(stdOut, that.stdOut) : that.stdOut != null) { + return false; + } + if (outputLog != null ? !outputLog.equals(that.outputLog) : that.outputLog != null) { + return false; + } + if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) { + return false; + } + if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) { + return false; + } + if (structuredOut != null ? !Arrays.equals(structuredOut, that.structuredOut) : that.structuredOut != null) { + return false; + } + if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) { + return false; + } return true; } @@ -371,7 +412,7 @@ public class HostRoleCommandEntity { } public void setExecutionCommand(ExecutionCommandEntity executionCommandsByTaskId) { - this.executionCommand = executionCommandsByTaskId; + executionCommand = executionCommandsByTaskId; } public StageEntity getStage() { http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java index a7bc948..e87e28b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java @@ -18,14 +18,28 @@ package org.apache.ambari.server.orm.entities; -import javax.persistence.*; +import static org.apache.commons.lang.StringUtils.defaultString; + import java.util.Collection; -import static org.apache.commons.lang.StringUtils.defaultString; +import javax.persistence.Basic; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.Id; +import javax.persistence.IdClass; +import javax.persistence.JoinColumn; +import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.OneToMany; +import javax.persistence.Table; -@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class) -@Table(name = "stage") @Entity +@Table(name = "stage") +@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class) +@NamedQueries({ @NamedQuery(name = "StageEntity.findByCommandStatuses", query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId") }) public class StageEntity { @Column(name = "cluster_id", updatable = false, nullable = false) @@ -47,11 +61,11 @@ public class StageEntity { @Column(name = "request_context") @Basic private String requestContext = ""; - + @Column(name = "cluster_host_info") @Basic private byte[] clusterHostInfo; - + @Column(name = "command_params") @Basic private byte[] commandParamsStage; @@ -63,7 +77,7 @@ public class StageEntity { @ManyToOne @JoinColumn(name = "request_id", referencedColumnName = "request_id", nullable = false) private RequestEntity request; - + @OneToMany(mappedBy = "stage", cascade = CascadeType.REMOVE, fetch = FetchType.LAZY) private Collection hostRoleCommands; @@ -114,7 +128,7 @@ public class StageEntity { public void setClusterHostInfo(String clusterHostInfo) { this.clusterHostInfo = clusterHostInfo.getBytes(); } - + public String getCommandParamsStage() { return commandParamsStage == null ? new String() : new String(commandParamsStage); } @@ -139,18 +153,36 @@ public class StageEntity { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } StageEntity that = (StageEntity) o; - if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) return false; - if (logInfo != null ? !logInfo.equals(that.logInfo) : that.logInfo != null) return false; - if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false; - if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false; - if (clusterHostInfo != null ? !clusterHostInfo.equals(that.clusterHostInfo) : that.clusterHostInfo != null) return false; - if (commandParamsStage != null ? !commandParamsStage.equals(that.commandParamsStage) : that.commandParamsStage != null) return false; - if (hostParamsStage != null ? !hostParamsStage.equals(that.hostParamsStage) : that.hostParamsStage != null) return false; + if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) { + return false; + } + if (logInfo != null ? !logInfo.equals(that.logInfo) : that.logInfo != null) { + return false; + } + if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) { + return false; + } + if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) { + return false; + } + if (clusterHostInfo != null ? !clusterHostInfo.equals(that.clusterHostInfo) : that.clusterHostInfo != null) { + return false; + } + if (commandParamsStage != null ? !commandParamsStage.equals(that.commandParamsStage) : that.commandParamsStage != null) { + return false; + } + if (hostParamsStage != null ? !hostParamsStage.equals(that.hostParamsStage) : that.hostParamsStage != null) { + return false; + } return !(requestContext != null ? !requestContext.equals(that.requestContext) : that.requestContext != null); } http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java index 36acbc2..d751f2d 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java @@ -17,15 +17,19 @@ */ package org.apache.ambari.server.actionmanager; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Singleton; -import com.google.inject.persist.PersistService; -import com.google.inject.persist.UnitOfWork; -import com.google.inject.util.Modules; +import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.persistence.EntityManager; + import junit.framework.Assert; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; @@ -40,6 +44,7 @@ import org.apache.ambari.server.orm.DBAccessor; import org.apache.ambari.server.orm.DBAccessorImpl; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.orm.dao.DaoUtils; import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; @@ -51,13 +56,16 @@ import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import com.google.inject.persist.PersistService; +import com.google.inject.persist.UnitOfWork; +import com.google.inject.util.Modules; public class TestActionDBAccessorImpl { private static final Logger log = LoggerFactory.getLogger(TestActionDBAccessorImpl.class); @@ -73,11 +81,19 @@ public class TestActionDBAccessorImpl { @Inject private Clusters clusters; + @Inject private ExecutionCommandDAO executionCommandDAO; + @Inject private HostRoleCommandDAO hostRoleCommandDAO; + @Inject + private Provider entityManagerProvider; + + @Inject + private DaoUtils daoUtils; + @Before public void setup() throws AmbariException { InMemoryDefaultTestModule defaultTestModule = new InMemoryDefaultTestModule(); @@ -157,30 +173,106 @@ public class TestActionDBAccessorImpl { "(command report status should be ignored)", HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER")); } - + @Test public void testGetStagesInProgress() throws AmbariException { - String hostname = "host1"; List stages = new ArrayList(); - stages.add(createStubStage(hostname, requestId, stageId)); - stages.add(createStubStage(hostname, requestId, stageId + 1)); + stages.add(createStubStage(hostName, requestId, stageId)); + stages.add(createStubStage(hostName, requestId, stageId + 1)); Request request = new Request(stages, clusters); db.persistActions(request); assertEquals(2, stages.size()); } - + @Test public void testGetStagesInProgressWithFailures() throws AmbariException { - String hostname = "host1"; - populateActionDB(db, hostname, requestId, stageId); - populateActionDB(db, hostname, requestId+1, stageId); - db.abortOperation(requestId); + populateActionDB(db, hostName, requestId, stageId); + populateActionDB(db, hostName, requestId + 1, stageId); List stages = db.getStagesInProgress(); + assertEquals(2, stages.size()); + + db.abortOperation(requestId); + stages = db.getStagesInProgress(); assertEquals(1, stages.size()); assertEquals(requestId+1, stages.get(0).getRequestId()); } @Test + public void testGetStagesInProgressWithManyStages() throws AmbariException { + // create 3 request; each request will have 3 stages, each stage 2 commands + populateActionDBMultipleStages(3, db, hostName, requestId, stageId); + populateActionDBMultipleStages(3, db, hostName, requestId + 1, stageId + 3); + populateActionDBMultipleStages(3, db, hostName, requestId + 2, stageId + 3); + + // verify stages and proper ordering + int commandsInProgressCount = db.getCommandsInProgressCount(); + List stages = db.getStagesInProgress(); + assertEquals(18, commandsInProgressCount); + assertEquals(9, stages.size()); + + long lastRequestId = Integer.MIN_VALUE; + for (Stage stage : stages) { + assertTrue(stage.getRequestId() >= lastRequestId); + lastRequestId = stage.getRequestId(); + } + + // cancel the first one, removing 3 stages + db.abortOperation(requestId); + + // verify stages and proper ordering + commandsInProgressCount = db.getCommandsInProgressCount(); + stages = db.getStagesInProgress(); + assertEquals(12, commandsInProgressCount); + assertEquals(6, stages.size()); + + // find the first stage, and change one command to COMPLETED + stages.get(0).setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(), + HostRoleStatus.COMPLETED); + + db.hostRoleScheduled(stages.get(0), hostName, Role.HBASE_MASTER.toString()); + + // the first stage still has at least 1 command IN_PROGRESS + commandsInProgressCount = db.getCommandsInProgressCount(); + stages = db.getStagesInProgress(); + assertEquals(11, commandsInProgressCount); + assertEquals(6, stages.size()); + + // find the first stage, and change the other command to COMPLETED + stages.get(0).setHostRoleStatus(hostName, + Role.HBASE_REGIONSERVER.toString(), HostRoleStatus.COMPLETED); + + db.hostRoleScheduled(stages.get(0), hostName, + Role.HBASE_REGIONSERVER.toString()); + + // verify stages and proper ordering + commandsInProgressCount = db.getCommandsInProgressCount(); + stages = db.getStagesInProgress(); + assertEquals(10, commandsInProgressCount); + assertEquals(5, stages.size()); + } + + @Test + public void testGetStagesInProgressWithManyCommands() throws AmbariException { + // 1000 hosts + for (int i = 0; i < 1000; i++) { + String hostName = "c64-" + i; + clusters.addHost(hostName); + clusters.getHost(hostName).persist(); + } + + // create 1 request, 3 stages per host, each with 2 commands + for (int i = 0; i < 1000; i++) { + String hostName = "c64-" + i; + populateActionDBMultipleStages(3, db, hostName, requestId + i, stageId); + } + + int commandsInProgressCount = db.getCommandsInProgressCount(); + List stages = db.getStagesInProgress(); + assertEquals(6000, commandsInProgressCount); + assertEquals(3000, stages.size()); + } + + @Test public void testPersistActions() throws AmbariException { populateActionDB(db, hostName, requestId, stageId); for (Stage stage : db.getAllStages(requestId)) { @@ -310,7 +402,7 @@ public class TestActionDBAccessorImpl { populateActionDB(db, hostName, requestId + 1, stageId); List requestIdsResult = db.getRequestsByStatus(null, BaseRequest.DEFAULT_PAGE_SIZE, false); - + assertNotNull("List of request IDs is null", requestIdsResult); assertEquals("Request IDs not matches", requestIds, requestIdsResult); } @@ -488,6 +580,20 @@ public class TestActionDBAccessorImpl { db.persistActions(request); } + private void populateActionDBMultipleStages(int numberOfStages, + ActionDBAccessor db, String hostname, long requestId, long stageId) + throws AmbariException { + + List stages = new ArrayList(); + for (int i = 0; i < numberOfStages; i++) { + Stage stage = createStubStage(hostname, requestId, stageId + i); + stages.add(stage); + } + + Request request = new Request(stages, clusters); + db.persistActions(request); + } + private Stage createStubStage(String hostname, long requestId, long stageId) { Stage s = new Stage(requestId, "/a/b", "cluster1", 1L, "action db accessor test", "clusterHostInfo", "commandParamsStage", "hostParamsStage"); http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index 7224924..e1db9f8 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -17,13 +17,21 @@ */ package org.apache.ambari.server.actionmanager; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollectionOf; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; @@ -36,9 +44,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import com.google.common.reflect.TypeToken; -import com.google.inject.persist.UnitOfWork; import junit.framework.Assert; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; @@ -69,7 +76,6 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEve import org.apache.ambari.server.utils.StageUtils; import org.easymock.Capture; import org.easymock.EasyMock; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; @@ -77,6 +83,9 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.reflect.TypeToken; +import com.google.inject.persist.UnitOfWork; + public class TestActionScheduler { private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class); @@ -96,7 +105,7 @@ public class TestActionScheduler { */ @Test public void testActionSchedule() throws Exception { - + Type type = new TypeToken>>() {}.getType(); Map> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type); @@ -116,7 +125,7 @@ public class TestActionScheduler { when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - + Host host = mock(Host.class); HashMap hosts = new HashMap(); @@ -132,6 +141,7 @@ public class TestActionScheduler { Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); stages.add(s); + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); Request request = mock(Request.class); @@ -167,7 +177,7 @@ public class TestActionScheduler { int expectedQueueSize, ActionScheduler scheduler) { int cycleCount = 0; while (cycleCount++ <= MAX_CYCLE_ITERATIONS) { - List ac = aq.dequeueAll(hostname); + List ac = aq.dequeueAll(hostname); if (ac != null) { if (ac.size() == expectedQueueSize) { return ac; @@ -220,6 +230,7 @@ public class TestActionScheduler { stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); @@ -292,7 +303,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -379,7 +390,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @@ -508,7 +519,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -572,7 +583,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -680,7 +691,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); Properties properties = new Properties(); @@ -764,7 +775,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); Properties properties = new Properties(); @@ -805,7 +816,7 @@ public class TestActionScheduler { when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - + String hostname1 = "ahost.ambari.apache.org"; String hostname2 = "bhost.ambari.apache.org"; HashMap hosts = @@ -813,48 +824,48 @@ public class TestActionScheduler { hosts.put(hostname1, sch); hosts.put(hostname2, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); - + List stages = new ArrayList(); Stage backgroundStage = null; stages.add(//stage with background command backgroundStage = getStageWithSingleTask( hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, "REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1)); - + Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND ,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType()); - + stages.add( // Stage with the same hostname, should be scheduled getStageWithSingleTask( hostname1, "cluster1", Role.GANGLIA_MONITOR, RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2)); - + stages.add( getStageWithSingleTask( hostname2, "cluster1", Role.DATANODE, RoleCommand.START, Service.Type.HDFS, 3, 3, 3)); - - + + ActionDBAccessor db = mock(ActionDBAccessor.class); Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); - + Properties properties = new Properties(); properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true"); Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf); - + ActionManager am = new ActionManager( 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf); - + scheduler.doWork(); - + Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE")); Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE")); @@ -901,7 +912,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -1082,7 +1093,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -1259,7 +1270,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -1419,7 +1430,7 @@ public class TestActionScheduler { assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE))); assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.GANGLIA_SERVER))); } - + @Test public void testSuccessCriteria() { RoleStats rs1 = new RoleStats(1, (float)0.5); @@ -1427,37 +1438,37 @@ public class TestActionScheduler { assertTrue(rs1.isSuccessFactorMet()); rs1.numSucceeded = 0; assertFalse(rs1.isSuccessFactorMet()); - + RoleStats rs2 = new RoleStats(2, (float)0.5); rs2.numSucceeded = 1; assertTrue(rs2.isSuccessFactorMet()); - + RoleStats rs3 = new RoleStats(3, (float)0.5); rs3.numSucceeded = 2; assertTrue(rs2.isSuccessFactorMet()); rs3.numSucceeded = 1; assertFalse(rs3.isSuccessFactorMet()); - + RoleStats rs4 = new RoleStats(3, (float)1.0); rs4.numSucceeded = 2; assertFalse(rs3.isSuccessFactorMet()); } - + /** * This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly. */ @Test public void testClusterHostInfoCache() throws Exception { - + Type type = new TypeToken>>() {}.getType(); - + //Data for stages Map> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type); Map> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type); int stageId = 1; int requestId1 = 1; int requestId2 = 2; - + ActionQueue aq = new ActionQueue(); Properties properties = new Properties(); Configuration conf = new Configuration(properties); @@ -1492,6 +1503,7 @@ public class TestActionScheduler { "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); Stage s2 = StageUtils.getATestStage(requestId2, stageId, hostname, CLUSTER_HOST_INFO_UPDATED, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); + when(db.getCommandsInProgressCount()).thenReturn(1); when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s1)); //Keep large number of attempts so that the task is not expired finally @@ -1504,12 +1516,12 @@ public class TestActionScheduler { assertTrue(ac.get(0) instanceof ExecutionCommand); assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId()); - + assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo()); - + when(db.getCommandsInProgressCount()).thenReturn(1); when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2)); - + //Verify that ActionSheduler does not return cached value of cluster host info for new requestId ac = waitForQueueSize(hostname, aq, 1, scheduler); assertTrue(ac.get(0) instanceof ExecutionCommand); @@ -1572,7 +1584,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, @@ -1655,7 +1667,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -1728,7 +1740,7 @@ public class TestActionScheduler { Request request = mock(Request.class); when(request.isExclusive()).thenReturn(false); when(db.getRequest(anyLong())).thenReturn(request); - + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); List requestTasks = new ArrayList(); @@ -1894,7 +1906,7 @@ public class TestActionScheduler { when(host3.getHostName()).thenReturn(hostname); ActionDBAccessor db = mock(ActionDBAccessor.class); - + when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size()); when(db.getStagesInProgress()).thenReturn(stagesInProgress); List requestTasks = new ArrayList();