ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject ambari git commit: AMBARI-20712 - Parallel Requests With Intersecting Hosts Don't Block Correctly (jonathanhurley)
Date Sun, 09 Apr 2017 02:25:20 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 d7f64698a -> b4a302216


AMBARI-20712 - Parallel Requests With Intersecting Hosts Don't Block Correctly (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b4a30221
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b4a30221
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b4a30221

Branch: refs/heads/branch-2.5
Commit: b4a3022166d7f9be4ad3e4b6bcb5a6213700c21d
Parents: d7f6469
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Sat Apr 8 15:17:14 2017 -0400
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Sat Apr 8 22:04:07 2017 -0400

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   | 150 ++++++++-----------
 .../server/actionmanager/HostRoleCommand.java   |  26 ++++
 .../AmbariCustomCommandExecutionHelper.java     |   1 +
 .../server/orm/dao/HostRoleCommandDAO.java      |  49 ++++++
 .../orm/entities/HostRoleCommandEntity.java     |  33 +++-
 .../main/resources/Ambari-DDL-Derby-CREATE.sql  |   1 +
 .../main/resources/Ambari-DDL-MySQL-CREATE.sql  |   1 +
 .../main/resources/Ambari-DDL-Oracle-CREATE.sql |   1 +
 .../resources/Ambari-DDL-Postgres-CREATE.sql    |   1 +
 .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql |   1 +
 .../resources/Ambari-DDL-SQLServer-CREATE.sql   |   1 +
 .../actionmanager/TestActionScheduler.java      | 150 +++++++++++--------
 12 files changed, 260 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 8f0953c..6eaf485 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -41,7 +41,6 @@ import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.AgentCommand;
-import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.agent.CancelCommand;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
@@ -347,13 +346,13 @@ class ActionScheduler implements Runnable {
       }
 
       Set<Long> runningRequestIds = new HashSet<Long>();
-      List<Stage> stages = db.getFirstStageInProgressPerRequest();
+      List<Stage> firstStageInProgressPerRequest = db.getFirstStageInProgressPerRequest();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Scheduler wakes up");
-        LOG.debug("Processing {} in progress stages ", stages.size());
+        LOG.debug("Processing {} in progress stages", firstStageInProgressPerRequest.size());
       }
 
-      if (stages.isEmpty()) {
+      if (firstStageInProgressPerRequest.isEmpty()) {
         // Nothing to do
         if (LOG.isDebugEnabled()) {
           LOG.debug("There are no stages currently in progress.");
@@ -365,11 +364,19 @@ class ActionScheduler implements Runnable {
 
       int i_stage = 0;
 
-      HashSet<String> hostsWithTasks = getListOfHostsWithPendingTask(stages);
-      actionQueue.updateListOfHostsWithPendingTask(hostsWithTasks);
+      // get the range of requests in progress
+      long iLowestRequestIdInProgress = firstStageInProgressPerRequest.get(0).getRequestId();
+      long iHighestRequestIdInProgress = firstStageInProgressPerRequest.get(
+          firstStageInProgressPerRequest.size() - 1).getRequestId();
 
-      stages = filterParallelPerHostStages(stages);
-      // At this point the stages is a filtered list
+      List<String> hostsWithPendingTasks = hostRoleCommandDAO.getHostsWithPendingTasks(
+          iLowestRequestIdInProgress, iHighestRequestIdInProgress);
+
+      actionQueue.updateListOfHostsWithPendingTask(new HashSet<>(hostsWithPendingTasks));
+
+      // filter the stages in progress down to those which can be scheduled in
+      // parallel
+      List<Stage> stages = filterParallelPerHostStages(firstStageInProgressPerRequest);
 
       boolean exclusiveRequestIsGoing = false;
       // This loop greatly depends on the fact that order of stages in
@@ -534,123 +541,86 @@ class ActionScheduler implements Runnable {
 
 
   /**
-   * Returns the list of hosts that have a task assigned
-   *
-   * @param stages
-   * @return
-   */
-  private HashSet<String> getListOfHostsWithPendingTask(List<Stage> stages) {
-    HashSet<String> hostsWithTasks = new HashSet<String>();
-    for (Stage s : stages) {
-      hostsWithTasks.addAll(s.getHosts());
-    }
-    return hostsWithTasks;
-  }
-
-  /**
-   * Returns filtered list of stages such that the returned list is an ordered list of stages that may
-   * be executed in parallel or in the order in which they are presented
+   * Returns filtered list of stages such that the returned list is an ordered
+   * list of stages that may be executed in parallel or in the order in which
+   * they are presented.
    * <p/>
-   * Assumption: the list of stages supplied as input are ordered by request id and then stage id.
-   * <p/>
-   * Rules:
+   * The following rules will be applied to the list:
    * <ul>
-   * <li>
-   * Stages are filtered such that the first stage in the list (assumed to be the first pending
-   * stage from the earliest active request) has priority
-   * </li>
-   * <li>
-   * No stage in any request may be executed before an earlier stage in the same request
-   * </li>
-   * <li>
-   * A stages in different requests may be performed in parallel if the relevant hosts for the
-   * stage in the later requests do not intersect with the union of hosts from (pending) stages
-   * in earlier requests
+   * <li>Stages are filtered such that the first stage in the list (assumed to
+   * be the first pending stage from the earliest active request) has priority.
    * </li>
+   * <li>No stage in any request may be executed before an earlier stage in the
+   * same request. This requirement is automatically covered by virtue of the
+   * supplied stages only being for the next stage in progress per request.</li>
+   * <li>A stage in different request may be performed in parallel
+   * if-and-only-if the relevant hosts for the stage in the later requests do
+   * not intersect with the union of hosts from (pending) stages in earlier
+   * requests. In order to accomplish this</li>
    * </ul>
    *
-   * @param stages the stages to process
+   * @param firstStageInProgressPerRequest
+   *          the stages to process, one stage per request
    * @return a list of stages that may be executed in parallel
    */
-  private List<Stage> filterParallelPerHostStages(List<Stage> stages) {
-    List<Stage> retVal = new ArrayList<Stage>();
-    Set<String> affectedHosts = new HashSet<String>();
-    Set<Long> affectedRequests = new HashSet<Long>();
+  private List<Stage> filterParallelPerHostStages(List<Stage> firstStageInProgressPerRequest) {
+    // if there's only 1 stage in progress in 1 request, simply return that stage
+    if (firstStageInProgressPerRequest.size() == 1) {
+      return firstStageInProgressPerRequest;
+    }
 
-    for (Stage s : stages) {
-      long requestId = s.getRequestId();
+    List<Stage> retVal = new ArrayList<>();
+
+    // set the lower range (inclusive) of requests to limit the query a bit
+    // since there can be a LOT of commands
+    long lowerRequestIdInclusive = firstStageInProgressPerRequest.get(0).getRequestId();
+
+    // determine if this stage can be scheduled in parallel with the other
+    // stages from other requests
+    for (Stage stage : firstStageInProgressPerRequest) {
+      long requestId = stage.getRequestId();
 
       if (LOG.isTraceEnabled()) {
-        LOG.trace("==> Processing stage: {}/{} ({}) for {}", requestId, s.getStageId(), s.getRequestContext());
+        LOG.trace("==> Processing stage: {}/{} ({}) for {}", requestId, stage.getStageId(), stage.getRequestContext());
       }
 
       boolean addStage = true;
 
+      // there are at least 2 request in progress concurrently; determine which
+      // hosts are affected
+      HashSet<String> hostsInProgressForEarlierRequests = new HashSet<>(
+          hostRoleCommandDAO.getBlockingHostsForRequest(lowerRequestIdInclusive, requestId));
+
       // Iterate over the relevant hosts for this stage to see if any intersect with the set of
       // hosts needed for previous stages.  If any intersection occurs, this stage may not be
       // executed in parallel.
-      for (String host : s.getHosts()) {
+      for (String host : stage.getHosts()) {
         LOG.trace("===> Processing Host {}", host);
 
-        if (affectedHosts.contains(host)) {
+        if (hostsInProgressForEarlierRequests.contains(host)) {
           if (LOG.isTraceEnabled()) {
-            LOG.trace("===>  Skipping stage since it utilizes at least one host that a previous stage requires: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
-          }
-
-          addStage &= false;
-        } else {
-          if (!Stage.INTERNAL_HOSTNAME.equalsIgnoreCase(host) && !isStageHasBackgroundCommandsOnly(s, host)) {
-            LOG.trace("====>  Adding host to affected hosts: {}", host);
-            affectedHosts.add(host);
+            LOG.trace("===>  Skipping stage since it utilizes at least one host that a previous stage requires: {}/{} ({})", stage.getRequestId(), stage.getStageId(), stage.getRequestContext());
           }
 
-          addStage &= true;
-        }
-      }
-
-      // If this stage is for a request that we have already processed, the it cannot execute in
-      // parallel since only one stage per request my execute at a time. The first time we encounter
-      // a request id, will be for the first pending stage for that request, so it is a candidate
-      // for execution at this time - if the previous test for host intersection succeeds.
-      if (affectedRequests.contains(requestId)) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("===>  Skipping stage since the request it is in has been processed already: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
-        }
-
-        addStage = false;
-      } else {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("====>  Adding request to affected requests: {}", requestId);
+          addStage = false;
+          break;
         }
-
-        affectedRequests.add(requestId);
-        addStage &= true;
       }
 
-      // If both tests pass - the stage is the first pending stage in its request and the hosts
-      // required in the stage do not intersect with hosts from stages that should occur before this,
-      // than add it to the list of stages that may be executed in parallel.
+      // add the stage is no other prior stages for prior requests intersect the
+      // hosts in this stage
       if (addStage) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("===>  Adding stage to return value: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
+          LOG.trace("===>  Adding stage to return value: {}/{} ({})", stage.getRequestId(), stage.getStageId(), stage.getRequestContext());
         }
 
-        retVal.add(s);
+        retVal.add(stage);
       }
     }
 
     return retVal;
   }
 
-  private boolean isStageHasBackgroundCommandsOnly(Stage s, String host) {
-    for (ExecutionCommandWrapper c : s.getExecutionCommands(host)) {
-      if (c.getCommandType() != AgentCommandType.BACKGROUND_EXECUTION_COMMAND) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   private boolean hasPreviousStageFailed(Stage stage) {
     boolean failed = false;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
index 651eb24..87a6edf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
@@ -68,6 +68,7 @@ public class HostRoleCommand {
   private String commandDetail;
   private String customCommandName;
   private ExecutionCommandWrapper executionCommandWrapper;
+  private boolean isBackgroundCommand = false;
 
   @Inject
   private ExecutionCommandDAO executionCommandDAO;
@@ -179,6 +180,7 @@ public class HostRoleCommand {
     event = new ServiceComponentHostEventWrapper(hostRoleCommandEntity.getEvent());
     commandDetail = hostRoleCommandEntity.getCommandDetail();
     customCommandName = hostRoleCommandEntity.getCustomCommandName();
+    isBackgroundCommand = hostRoleCommandEntity.isBackgroundCommand();
   }
 
   //todo: why is this not symmetrical with the constructor which takes an entity
@@ -201,6 +203,7 @@ public class HostRoleCommand {
     hostRoleCommandEntity.setRoleCommand(roleCommand);
     hostRoleCommandEntity.setCommandDetail(commandDetail);
     hostRoleCommandEntity.setCustomCommandName(customCommandName);
+    hostRoleCommandEntity.setBackgroundCommand(isBackgroundCommand);
 
     HostEntity hostEntity = hostDAO.findById(hostId);
     if (null != hostEntity) {
@@ -433,6 +436,29 @@ public class HostRoleCommand {
   }
 
   /**
+   * Gets whether this command runs in the background and does not block other
+   * commands.
+   *
+   * @return {@code true} if this command runs in the background, {@code false}
+   *         otherise.
+   */
+  public boolean isBackgroundCommand() {
+    return isBackgroundCommand;
+  }
+
+  /**
+   * Sets whether this command runs in the background and does not block other
+   * commands.
+   *
+   * @param isBackgroundCommand
+   *          {@code true} if this command runs in the background, {@code false}
+   *          otherise.
+   */
+  public void setBackgroundCommand(boolean isBackgroundCommand) {
+    this.isBackgroundCommand = isBackgroundCommand;
+  }
+
+  /**
    * Gets whether commands which fail and are retryable are automatically
    * skipped and marked with {@link HostRoleStatus#SKIPPED_FAILED}.
    *

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index 867ebff..b82ce0f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -376,6 +376,7 @@ public class AmbariCustomCommandExecutionHelper {
 
       //set type background
       if(customCommandDefinition != null && customCommandDefinition.isBackground()){
+        cmd.setBackgroundCommand(true);
         execCmd.setCommandType(AgentCommandType.BACKGROUND_EXECUTION_COMMAND);
       }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 02c4091..30b3100 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -41,6 +41,7 @@ import org.apache.ambari.annotations.TransactionalLock.LockArea;
 import org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.api.query.JpaPredicateVisitor;
 import org.apache.ambari.server.api.query.JpaSortBuilder;
 import org.apache.ambari.server.configuration.Configuration;
@@ -853,6 +854,54 @@ public class HostRoleCommandDAO {
   }
 
   /**
+   * Gets a lists of hosts with commands in progress given a range of requests.
+   * The range of requests should include all requests with at least 1 stage in
+   * progress.
+   *
+   * @return the list of hosts with commands in progress.
+   * @see HostRoleStatus#IN_PROGRESS_STATUSES
+   */
+  @RequiresSession
+  public List<String> getHostsWithPendingTasks(long iLowestRequestIdInProgress,
+      long iHighestRequestIdInProgress) {
+    TypedQuery<String> query = entityManagerProvider.get().createNamedQuery(
+        "HostRoleCommandEntity.findHostsByCommandStatus", String.class);
+
+    query.setParameter("iLowestRequestIdInProgress", iLowestRequestIdInProgress);
+    query.setParameter("iHighestRequestIdInProgress", iHighestRequestIdInProgress);
+    query.setParameter("statuses", HostRoleStatus.IN_PROGRESS_STATUSES);
+    return daoUtils.selectList(query);
+  }
+
+  /**
+   * Gets a lists of hosts with commands in progress which occurr before the
+   * specified request ID. This will only return commands which are not
+   * {@link AgentCommandType#BACKGROUND_EXECUTION_COMMAND} as thsee commands do
+   * not block future requests.
+   *
+   * @param lowerRequestIdInclusive
+   *          the lowest request ID to consider (inclusive) when getting any
+   *          blocking hosts.
+   * @param requestId
+   *          the request ID to calculate any blocking hosts for (essentially,
+   *          the upper limit exclusive)
+   * @return the list of hosts from older running requests which will block
+   *         those same hosts in the specified request ID.
+   * @see HostRoleStatus#IN_PROGRESS_STATUSES
+   */
+  @RequiresSession
+  public List<String> getBlockingHostsForRequest(long lowerRequestIdInclusive,
+      long requestId) {
+    TypedQuery<String> query = entityManagerProvider.get().createNamedQuery(
+        "HostRoleCommandEntity.getBlockingHostsForRequest", String.class);
+
+    query.setParameter("lowerRequestIdInclusive", lowerRequestIdInclusive);
+    query.setParameter("upperRequestIdExclusive", requestId);
+    query.setParameter("statuses", HostRoleStatus.IN_PROGRESS_STATUSES);
+    return daoUtils.selectList(query);
+  }
+
+  /**
    * The {@link HostRoleCommandPredicateVisitor} is used to convert an Ambari
    * {@link Predicate} into a JPA {@link javax.persistence.criteria.Predicate}.
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 74271b9..683f9ef 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -70,7 +70,14 @@ import org.apache.commons.lang.ArrayUtils;
     @NamedQuery(name = "HostRoleCommandEntity.findByHostRoleNullHost", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.hostEntity IS NULL AND command.requestId=:requestId AND command.stageId=:stageId AND command.role=:role"),
     @NamedQuery(name = "HostRoleCommandEntity.findByStatusBetweenStages", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId AND command.stageId >= :minStageId AND command.stageId <= :maxStageId AND command.status = :status"),
     @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipExcludeRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand <> :roleCommand"),
-    @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipForRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand = :roleCommand")
+    @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipForRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand = :roleCommand"),
+    @NamedQuery(
+        name = "HostRoleCommandEntity.findHostsByCommandStatus",
+        query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
+    @NamedQuery(
+        name = "HostRoleCommandEntity.getBlockingHostsForRequest",
+        query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL")
+
 })
 public class HostRoleCommandEntity {
 
@@ -195,6 +202,10 @@ public class HostRoleCommandEntity {
   @OneToOne(mappedBy = "hostRoleCommandEntity", cascade = CascadeType.REMOVE)
   private TopologyLogicalTaskEntity topologyLogicalTaskEntity;
 
+  @Basic
+  @Column(name = "is_background_command", nullable = false)
+  private short isBackgroundCommand = 0;
+
   public Long getTaskId() {
     return taskId;
   }
@@ -407,6 +418,26 @@ public class HostRoleCommandEntity {
     autoSkipOnFailure = skipFailures ? 1 : 0;
   }
 
+  /**
+   * Sets whether this is a command is a background command and will not block
+   * other commands.
+   *
+   * @param runInBackground
+   *          {@code true} if this is a background command, {@code false}
+   *          otherwise.
+   */
+  public void setBackgroundCommand(boolean runInBackground) {
+    isBackgroundCommand = (short) (runInBackground ? 1 : 0);
+  }
+
+  /**
+   * Gets whether this command runs in the background and will not block other
+   * commands.
+   */
+  public boolean isBackgroundCommand() {
+    return isBackgroundCommand == 0 ? false : true;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
index f007b53..5dde3a7 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
@@ -388,6 +388,7 @@ CREATE TABLE host_role_command (
   role_command VARCHAR(255),
   command_detail VARCHAR(255),
   custom_command_name VARCHAR(255),
+  is_background_command SMALLINT DEFAULT 0 NOT NULL,
   CONSTRAINT PK_host_role_command PRIMARY KEY (task_id),
   CONSTRAINT FK_host_role_command_host_id FOREIGN KEY (host_id) REFERENCES hosts (host_id),
   CONSTRAINT FK_host_role_command_stage_id FOREIGN KEY (stage_id, request_id) REFERENCES stage (stage_id, request_id));

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index f6cb896..cc0c175 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -399,6 +399,7 @@ CREATE TABLE host_role_command (
   structured_out LONGBLOB,
   command_detail VARCHAR(255),
   custom_command_name VARCHAR(255),
+  is_background_command SMALLINT DEFAULT 0 NOT NULL,
   CONSTRAINT PK_host_role_command PRIMARY KEY (task_id),
   CONSTRAINT FK_host_role_command_host_id FOREIGN KEY (host_id) REFERENCES hosts (host_id),
   CONSTRAINT FK_host_role_command_stage_id FOREIGN KEY (stage_id, request_id) REFERENCES stage (stage_id, request_id));

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
index 19253e8..255b695 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
@@ -379,6 +379,7 @@ CREATE TABLE host_role_command (
   structured_out BLOB NULL,
   command_detail VARCHAR2(255) NULL,
   custom_command_name VARCHAR2(255) NULL,
+  is_background_command SMALLINT DEFAULT 0 NOT NULL,
   CONSTRAINT PK_host_role_command PRIMARY KEY (task_id),
   CONSTRAINT FK_host_role_command_host_id FOREIGN KEY (host_id) REFERENCES hosts (host_id),
   CONSTRAINT FK_host_role_command_stage_id FOREIGN KEY (stage_id, request_id) REFERENCES stage (stage_id, request_id));

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index b13a9e3..12ed51a 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -388,6 +388,7 @@ CREATE TABLE host_role_command (
   role_command VARCHAR(255),
   command_detail VARCHAR(255),
   custom_command_name VARCHAR(255),
+  is_background_command SMALLINT DEFAULT 0 NOT NULL,
   CONSTRAINT PK_host_role_command PRIMARY KEY (task_id),
   CONSTRAINT FK_host_role_command_host_id FOREIGN KEY (host_id) REFERENCES hosts (host_id),
   CONSTRAINT FK_host_role_command_stage_id FOREIGN KEY (stage_id, request_id) REFERENCES stage (stage_id, request_id));

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
index cf2954a..afd9d6a 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
@@ -377,6 +377,7 @@ CREATE TABLE host_role_command (
   structured_out IMAGE,
   command_detail VARCHAR(255),
   custom_command_name VARCHAR(255),
+  is_background_command SMALLINT DEFAULT 0 NOT NULL,
   CONSTRAINT PK_host_role_command PRIMARY KEY (task_id),
   CONSTRAINT FK_host_role_command_host_id FOREIGN KEY (host_id) REFERENCES hosts (host_id),
   CONSTRAINT FK_host_role_command_stage_id FOREIGN KEY (stage_id, request_id) REFERENCES stage (stage_id, request_id));

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
index 16c269a..c541b3a 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
@@ -393,6 +393,7 @@ CREATE TABLE host_role_command (
   role_command VARCHAR(255),
   command_detail VARCHAR(255),
   custom_command_name VARCHAR(255),
+  is_background_command SMALLINT DEFAULT 0 NOT NULL,
   CONSTRAINT PK_host_role_command PRIMARY KEY CLUSTERED (task_id),
   CONSTRAINT FK_host_role_command_host_id FOREIGN KEY (host_id) REFERENCES hosts (host_id),
   CONSTRAINT FK_host_role_command_stage_id FOREIGN KEY (stage_id, request_id) REFERENCES stage (stage_id, request_id));

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4a30221/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index fd22b75..8a7284d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -195,7 +195,7 @@ public class TestActionScheduler {
 
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname, sch);
     HostEntity hostEntity = new HostEntity();
     hostEntity.setHostName(hostname);
@@ -207,6 +207,7 @@ public class TestActionScheduler {
     when(host.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
     Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
       "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
 
@@ -221,7 +222,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
-        10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null);
+        10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -291,7 +292,7 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
@@ -312,6 +313,7 @@ public class TestActionScheduler {
     List<Stage> stages = Collections.singletonList(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
     when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
 
@@ -333,7 +335,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
 
@@ -382,7 +384,7 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
@@ -396,6 +398,7 @@ public class TestActionScheduler {
     List<Stage> stages = Collections.singletonList(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -418,7 +421,7 @@ public class TestActionScheduler {
     //Small action timeout to test rescheduling
     AmbariEventPublisher aep = EasyMock.createNiceMock(AmbariEventPublisher.class);
     ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
-      new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null);
+      new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     int cycleCount=0;
@@ -455,7 +458,7 @@ public class TestActionScheduler {
     hostDAO.merge(hostEntity2);
 
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname1, sch1);
     hosts.put(hostname2, sch2);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
@@ -484,6 +487,7 @@ public class TestActionScheduler {
     final List<Stage> stages = Collections.singletonList(stage);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -538,7 +542,7 @@ public class TestActionScheduler {
     // Make sure the NN install doesn't timeout
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     int cycleCount=0;
@@ -600,6 +604,7 @@ public class TestActionScheduler {
     List<Stage> stages = Collections.singletonList(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -652,7 +657,7 @@ public class TestActionScheduler {
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -690,13 +695,13 @@ public class TestActionScheduler {
     String hostname1 = "ahost.ambari.apache.org";
     String hostname2 = "bhost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
-        new HashMap<String, ServiceComponentHost>();
+        new HashMap<>();
     hosts.put(hostname1, sch);
     hosts.put(hostname2, sch);
     hosts.put(Stage.INTERNAL_HOSTNAME, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
-    List<Stage> stages = new ArrayList<Stage>();
+    List<Stage> stages = new ArrayList<>();
     Stage stage01 = createStage(clusterName, 0, 1);
     addTask(stage01, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 1);
 
@@ -715,6 +720,7 @@ public class TestActionScheduler {
     stages.add(stage12);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -729,7 +735,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf,
-        entityManagerProviderMock, (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        entityManagerProviderMock, hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     scheduler.doWork();
 
@@ -756,6 +762,7 @@ public class TestActionScheduler {
     List<Stage> stages = Collections.singletonList(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -809,7 +816,7 @@ public class TestActionScheduler {
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState()
@@ -927,7 +934,7 @@ public class TestActionScheduler {
     s.getExecutionCommands(null).get(0).getExecutionCommand().setRoleCommand(roleCommand);
 
     aq.enqueue(Stage.INTERNAL_HOSTNAME, s.getExecutionCommands(null).get(0).getExecutionCommand());
-    List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
+    List<ExecutionCommand> commandsToSchedule = new ArrayList<>();
     Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
 
     boolean taskShouldBeSkipped = stageSupportsAutoSkip && autoSkipFailedTask;
@@ -968,6 +975,7 @@ public class TestActionScheduler {
     List<Stage> stages = Collections.singletonList(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1020,7 +1028,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -1083,62 +1091,66 @@ public class TestActionScheduler {
     String hostname3 = "chost.ambari.apache.org";
     String hostname4 = "chost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname1, sch);
     hosts.put(hostname2, sch);
     hosts.put(hostname3, sch);
     hosts.put(hostname4, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
-    List<Stage> stages = new ArrayList<Stage>();
-    stages.add(
+    List<Stage> firstStageInProgressPerRequest = new ArrayList<>();
+
+    firstStageInProgressPerRequest.add(
             getStageWithSingleTask(
                     hostname1, "cluster1", Role.DATANODE,
                     RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
-    stages.add( // Stage with the same hostname, should not be scheduled
+
+    // Stage with the same hostname, should not be scheduled
+    firstStageInProgressPerRequest.add(
             getStageWithSingleTask(
                     hostname1, "cluster1", Role.GANGLIA_MONITOR,
                     RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
 
-    stages.add(
+    firstStageInProgressPerRequest.add(
             getStageWithSingleTask(
                     hostname2, "cluster1", Role.DATANODE,
                     RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
 
-    stages.add(
+    firstStageInProgressPerRequest.add(
         getStageWithSingleTask(
             hostname3, "cluster1", Role.DATANODE,
             RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
 
-    stages.add( // Stage with the same request id, should not be scheduled
-        getStageWithSingleTask(
-            hostname4, "cluster1", Role.GANGLIA_MONITOR,
-            RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
-
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+
+    List<String> blockingHostsRequest1 = new ArrayList<>();
+    when(hostRoleCommandDAOMock.getBlockingHostsForRequest(1, 1)).thenReturn(blockingHostsRequest1);
+
+    List<String> blockingHostsRequest2 = Lists.newArrayList(hostname1);
+    when(hostRoleCommandDAOMock.getBlockingHostsForRequest(1, 2)).thenReturn(blockingHostsRequest2);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequestEntity(anyLong())).thenReturn(request);
 
-    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
-    when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
+    when(db.getCommandsInProgressCount()).thenReturn(firstStageInProgressPerRequest.size());
+    when(db.getFirstStageInProgressPerRequest()).thenReturn(firstStageInProgressPerRequest);
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
     scheduler.doWork();
 
-    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
-    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
-    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
-    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(3).getHostRoleStatus(hostname3, "DATANODE"));
-    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR"));
+    Assert.assertEquals(HostRoleStatus.QUEUED, firstStageInProgressPerRequest.get(0).getHostRoleStatus(hostname1, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.PENDING, firstStageInProgressPerRequest.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
+    Assert.assertEquals(HostRoleStatus.QUEUED, firstStageInProgressPerRequest.get(2).getHostRoleStatus(hostname2, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.QUEUED, firstStageInProgressPerRequest.get(3).getHostRoleStatus(hostname3, "DATANODE"));
   }
 
 
@@ -1165,22 +1177,22 @@ public class TestActionScheduler {
     String hostname3 = "chost.ambari.apache.org";
     String hostname4 = "chost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname1, sch);
     hosts.put(hostname2, sch);
     hosts.put(hostname3, sch);
     hosts.put(hostname4, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
-    List<Stage> stages = new ArrayList<Stage>();
+    List<Stage> stages = new ArrayList<>();
     Stage stage = getStageWithSingleTask(
             hostname1, "cluster1", Role.HIVE_CLIENT,
             RoleCommand.INSTALL, Service.Type.HIVE, 1, 1, 1);
-    Map<String, String> hiveSite = new TreeMap<String, String>();
+    Map<String, String> hiveSite = new TreeMap<>();
     hiveSite.put("javax.jdo.option.ConnectionPassword", "password");
     hiveSite.put("hive.server2.thrift.port", "10000");
     Map<String, Map<String, String>> configurations =
-            new TreeMap<String, Map<String, String>>();
+            new TreeMap<>();
     configurations.put("hive-site", hiveSite);
     stage.getExecutionCommands(hostname1).get(0).getExecutionCommand().setConfigurations(configurations);
     stages.add(stage);
@@ -1206,6 +1218,7 @@ public class TestActionScheduler {
             RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1220,7 +1233,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
             new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
@@ -1256,12 +1269,12 @@ public class TestActionScheduler {
     String hostname1 = "ahost.ambari.apache.org";
     String hostname2 = "bhost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
-        new HashMap<String, ServiceComponentHost>();
+        new HashMap<>();
     hosts.put(hostname1, sch);
     hosts.put(hostname2, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
-    List<Stage> stages = new ArrayList<Stage>();
+    List<Stage> stages = new ArrayList<>();
     Stage backgroundStage = null;
     stages.add(//stage with background command
         backgroundStage = getStageWithSingleTask(
@@ -1281,6 +1294,7 @@ public class TestActionScheduler {
 
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1295,7 +1309,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -1324,7 +1338,7 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
@@ -1423,7 +1437,7 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
 
-    Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<Collection<HostRoleCommand>>();
+    Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<>();
     ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long)100, (long)50, db, aq, fsm, 3,
           new HostsMap((String) null),
@@ -1441,7 +1455,7 @@ public class TestActionScheduler {
 
     scheduler.doWork();
 
-    List<CommandReport> reports = new ArrayList<CommandReport>();
+    List<CommandReport> reports = new ArrayList<>();
     reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS, "1-1", 1));
     am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(stages.get(0).getOrderedHostRoleCommands()));
 
@@ -1477,7 +1491,7 @@ public class TestActionScheduler {
     String host2 = "host2";
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(host1, sch);
     hosts.put(host2, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
@@ -1538,6 +1552,7 @@ public class TestActionScheduler {
     stage.setLastAttemptTime(host2, Role.HBASE_CLIENT.toString(), now);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1610,7 +1625,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     scheduler.doWork();
 
@@ -1689,7 +1704,7 @@ public class TestActionScheduler {
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
-    final List<Stage> stages = new ArrayList<Stage>();
+    final List<Stage> stages = new ArrayList<>();
 
     long now = System.currentTimeMillis();
     Stage stage = stageFactory.createNew(1, "/tmp", "cluster1", 1L, "testRequestFailureBasedOnSuccessFactor",
@@ -1725,6 +1740,7 @@ public class TestActionScheduler {
             "host1", "cluster1", Role.HDFS_CLIENT, RoleCommand.UPGRADE, Service.Type.HDFS, 4, 2, 1));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1804,13 +1820,13 @@ public class TestActionScheduler {
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     ActionManager am = new ActionManager(db, requestFactory, scheduler);
 
     scheduler.doWork();
 
-    List<CommandReport> reports = new ArrayList<CommandReport>();
+    List<CommandReport> reports = new ArrayList<>();
     reports.add(getCommandReport(HostRoleStatus.FAILED, Role.DATANODE, Service.Type.HDFS, "1-1", 1));
     am.processTaskResponse("host1", reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
 
@@ -1963,7 +1979,7 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
@@ -1972,6 +1988,7 @@ public class TestActionScheduler {
     when(host.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1989,7 +2006,7 @@ public class TestActionScheduler {
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
         10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -2033,10 +2050,12 @@ public class TestActionScheduler {
     when(host1.getHostName()).thenReturn(hostname1);
     when(scomp.getServiceComponentHost(hostname1)).thenReturn(sch1);
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     hosts.put(hostname1, sch1);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
+    HostRoleCommandDAO hostRoleCommandDAO = mock(HostRoleCommandDAO.class);
+
     HostEntity hostEntity = new HostEntity();
     hostEntity.setHostName(hostname1);
     hostDAO.create(hostEntity);
@@ -2072,7 +2091,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAO, (HostRoleCommandFactory) null);
 
     final CountDownLatch abortCalls = new CountDownLatch(2);
 
@@ -2129,6 +2148,7 @@ public class TestActionScheduler {
     List<Stage> stages = Collections.singletonList(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -2181,7 +2201,7 @@ public class TestActionScheduler {
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -2216,7 +2236,7 @@ public class TestActionScheduler {
     hostEntity.setHostName(hostname);
     hostDAO.create(hostEntity);
 
-    HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>();
+    HashMap<String, ServiceComponentHost> hosts = new HashMap<>();
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
@@ -2291,7 +2311,7 @@ public class TestActionScheduler {
     when(db.getStagesInProgressForRequest(requestId)).thenReturn(stagesInProgress);
     when(db.getAllStages(anyLong())).thenReturn(allStages);
 
-    List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
+    List<HostRoleCommand> requestTasks = new ArrayList<>();
 
     for (Stage stage : allStages) {
       requestTasks.addAll(stage.getOrderedHostRoleCommands());
@@ -2412,7 +2432,7 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
     HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+            new HashMap<>();
     String hostname1 = "hostname1";
     String hostname2 = "hostname2";
     String hostname3 = "hostname3";
@@ -2471,10 +2491,11 @@ public class TestActionScheduler {
     when(host3.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
     when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
     when(db.getFirstStageInProgressPerRequest()).thenReturn(firstStageInProgressByRequest);
 
-    List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
+    List<HostRoleCommand> requestTasks = new ArrayList<>();
     for (Stage stage : stagesInProgress) {
       requestTasks.addAll(stage.getOrderedHostRoleCommands());
     }
@@ -2521,7 +2542,7 @@ public class TestActionScheduler {
       }
     });
 
-    final Map<Long, Boolean> startedRequests = new HashMap<Long, Boolean>();
+    final Map<Long, Boolean> startedRequests = new HashMap<>();
     doAnswer(new Answer<Void>() {
       @Override
       public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -2546,7 +2567,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -2688,7 +2709,7 @@ public class TestActionScheduler {
 
     String hostname1 = "ahost.ambari.apache.org";
 
-    HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>();
+    HashMap<String, ServiceComponentHost> hosts = new HashMap<>();
 
     hosts.put(hostname1, sch);
 
@@ -2723,7 +2744,8 @@ public class TestActionScheduler {
     firstStageInProgress.add(stage);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
-
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequestEntity(anyLong())).thenReturn(request);
@@ -2793,7 +2815,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 


Mime
View raw message