ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpapirkovs...@apache.org
Subject git commit: AMBARI-4403. Fill start and end time of request entity. (mpapirkovskyy)
Date Thu, 23 Jan 2014 22:58:52 GMT
Updated Branches:
  refs/heads/trunk 5363f8416 -> f792d3d3f


AMBARI-4403. Fill start and end time of request entity. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f792d3d3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f792d3d3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f792d3d3

Branch: refs/heads/trunk
Commit: f792d3d3f6651d918ee5d201f1057f1c2bc65a3a
Parents: 5363f84
Author: Myroslav Papirkovskyy <mpapyrkovskyy@hortonworks.com>
Authored: Thu Jan 23 02:19:10 2014 +0200
Committer: Myroslav Papirkovskyy <mpapyrkovskyy@hortonworks.com>
Committed: Fri Jan 24 00:55:53 2014 +0200

----------------------------------------------------------------------
 .../server/actionmanager/ActionDBAccessor.java  |  14 +-
 .../actionmanager/ActionDBAccessorImpl.java     | 139 ++++++++++++++-----
 .../server/actionmanager/ActionManager.java     |  12 +-
 .../server/actionmanager/ActionScheduler.java   |  11 +-
 .../server/actionmanager/HostRoleStatus.java    |  45 +++---
 .../ambari/server/actionmanager/Request.java    |  10 ++
 .../server/orm/dao/HostRoleCommandDAO.java      |  27 +++-
 .../ambari/server/orm/dao/RequestDAO.java       |  20 +++
 .../server/orm/entities/RequestEntity.java      |  21 ++-
 .../server/actionmanager/TestActionManager.java |  28 ++--
 .../actionmanager/TestActionScheduler.java      |  63 ++++++---
 11 files changed, 280 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index c29f6f1..3dfdf66 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -18,9 +18,7 @@
 package org.apache.ambari.server.actionmanager;
 
 import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.controller.ExecuteActionRequest;
 
 import java.util.Collection;
 import java.util.List;
@@ -40,6 +38,13 @@ public interface ActionDBAccessor {
   public List<Stage> getAllStages(long requestId);
 
   /**
+   * Get request object by id
+   * @param requestId
+   * @return
+   */
+  Request getRequest(long requestId);
+
+  /**
    * Abort all outstanding operations associated with the given request
    */
   public void abortOperation(long requestId);
@@ -75,6 +80,11 @@ public interface ActionDBAccessor {
   void setSourceScheduleForRequest(long requestId, long scheduleId);
 
   /**
+   * Update tasks according to command reports
+   */
+  void updateHostRoleStates(Collection<CommandReport> reports);
+
+  /**
    * For the given host, update all the tasks based on the command report
    */
   public void updateHostRoleState(String hostname, long requestId,

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index bc7ea8f..96a3a0e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -20,21 +20,41 @@ package org.apache.ambari.server.actionmanager;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.controller.ExecuteActionRequest;
-import org.apache.ambari.server.orm.dao.*;
-import org.apache.ambari.server.orm.entities.*;
-import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.RequestScheduleDAO;
+import org.apache.ambari.server.orm.dao.RoleSuccessCriteriaDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 @Singleton
@@ -107,20 +127,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     return stages;
   }
 
+  @Override
+  public Request getRequest(long requestId) {
+    RequestEntity requestEntity = requestDAO.findByPK(requestId);
+    if (requestEntity != null) {
+      return requestFactory.createExisting(requestEntity);
+    } else {
+      return null;
+    }
+  }
+
   /* (non-Javadoc)
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#abortOperation(long)
    */
   @Override
-  @Transactional
   public void abortOperation(long requestId) {
     long now = System.currentTimeMillis();
 
-    //mark request as ended
-    RequestEntity requestEntity = requestDAO.findByPK(requestId);
-    if (requestEntity != null && requestEntity.getEndTime() == -1L) {
-      requestEntity.setEndTime(now);
-      requestDAO.merge(requestEntity);
-    }
+    endRequest(requestId);
 
     List<HostRoleCommandEntity> commands =
         hostRoleCommandDAO.findByRequest(requestId);
@@ -130,7 +154,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
           command.getStatus() == HostRoleStatus.PENDING) {
         command.setStatus(HostRoleStatus.ABORTED);
         command.setEndTime(now);
-        hostRoleCommandDAO.merge(command);
         LOG.info("Aborting command. Hostname " + command.getHostName()
             + " role " + command.getRole()
             + " requestId " + command.getRequestId()
@@ -138,13 +161,14 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
             + " stageId " + command.getStageId());
       }
     }
+
+    hostRoleCommandDAO.mergeAll(commands);
   }
 
   /* (non-Javadoc)
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#timeoutHostRole(long, long,
org.apache.ambari.server.Role)
    */
   @Override
-  @Transactional
   public void timeoutHostRole(String host, long requestId, long stageId,
                               String role) {
     long now = System.currentTimeMillis();
@@ -153,8 +177,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     for (HostRoleCommandEntity command : commands) {
       command.setStatus(HostRoleStatus.TIMEDOUT);
       command.setEndTime(now);
-      hostRoleCommandDAO.merge(command);
     }
+    hostRoleCommandDAO.mergeAll(commands);
+    endRequestIfCompleted(requestId);
   }
 
   /* (non-Javadoc)
@@ -165,7 +190,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     List<Stage> stages = new ArrayList<Stage>();
     List<HostRoleStatus> statuses =
         Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS,
-            HostRoleStatus.PENDING);
+          HostRoleStatus.PENDING);
     for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
       stages.add(stageFactory.createExisting(stageEntity));
     }
@@ -234,27 +259,30 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     }
     requestEntity.setStages(stageEntities);
     requestDAO.merge(requestEntity);
-//    requestDAO.create(requestEntity);
   }
 
   @Override
-  @Transactional
   public void startRequest(long requestId) {
     RequestEntity requestEntity = requestDAO.findByPK(requestId);
     if (requestEntity != null && requestEntity.getStartTime() == -1L) {
       requestEntity.setStartTime(System.currentTimeMillis());
+      requestDAO.merge(requestEntity);
     }
-    requestDAO.merge(requestEntity);
   }
 
   @Override
-  @Transactional
   public void endRequest(long requestId) {
     RequestEntity requestEntity = requestDAO.findByPK(requestId);
     if (requestEntity != null && requestEntity.getEndTime() == -1L) {
       requestEntity.setEndTime(System.currentTimeMillis());
+      requestDAO.merge(requestEntity);
+    }
+  }
+
+  public void endRequestIfCompleted(long requestId) {
+    if (requestDAO.isAllTasksCompleted(requestId)) {
+      endRequest(requestId);
     }
-    requestDAO.merge(requestEntity);
   }
 
   @Override
@@ -285,30 +313,76 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   @Override
-  @Transactional
+  public void updateHostRoleStates(Collection<CommandReport> reports) {
+    Map<Long, CommandReport> taskReports = new HashMap<Long, CommandReport>();
+    for (CommandReport report : reports) {
+      taskReports.put(report.getTaskId(), report);
+    }
+
+    long now = System.currentTimeMillis();
+
+    List<Long> requestsToCheck = new ArrayList<Long>();
+
+    List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
+    for (HostRoleCommandEntity commandEntity : commandEntities) {
+      CommandReport report = taskReports.get(commandEntity.getTaskId());
+      commandEntity.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+      commandEntity.setStdOut(report.getStdOut().getBytes());
+      commandEntity.setStdError(report.getStdErr().getBytes());
+      commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
+        report.getStructuredOut().getBytes());
+      commandEntity.setExitcode(report.getExitCode());
+
+      if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
+        commandEntity.setEndTime(now);
+
+        String actionId = report.getActionId();
+        long[] requestStageIds = StageUtils.getRequestStage(actionId);
+        long requestId = requestStageIds[0];
+        long stageId = requestStageIds[1];
+        if (requestDAO.getLastStageId(requestId).equals(stageId)) {
+          requestsToCheck.add(requestId);
+        }
+      }
+    }
+
+    hostRoleCommandDAO.mergeAll(commandEntities);
+
+    for (Long requestId : requestsToCheck) {
+      endRequestIfCompleted(requestId);
+    }
+  }
+
+  @Override
   public void updateHostRoleState(String hostname, long requestId,
                                   long stageId, String role, CommandReport report) {
+    boolean checkRequest = false;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Update HostRoleState: "
-          + "HostName " + hostname + " requestId " + requestId + " stageId "
-          + stageId + " role " + role + " report " + report);
+        + "HostName " + hostname + " requestId " + requestId + " stageId "
+        + stageId + " role " + role + " report " + report);
     }
     long now = System.currentTimeMillis();
     List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
-        hostname, requestId, stageId, role);
+      hostname, requestId, stageId, role);
     for (HostRoleCommandEntity command : commands) {
       command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
       command.setStdOut(report.getStdOut().getBytes());
       command.setStdError(report.getStdErr().getBytes());
       command.setStructuredOut(report.getStructuredOut() == null ? null :
-        report.getStructuredOut().getBytes());           // ===================================
-      if (command.getStatus() == HostRoleStatus.COMPLETED ||
-          command.getStatus() == HostRoleStatus.ABORTED ||
-          command.getStatus() == HostRoleStatus.FAILED) {
+        report.getStructuredOut().getBytes());
+      if (HostRoleStatus.getCompletedStates().contains(command.getStatus())) {
         command.setEndTime(now);
+        if (requestDAO.getLastStageId(requestId).equals(stageId)) {
+          checkRequest = true;
+        }
       }
       command.setExitcode(report.getExitCode());
-      hostRoleCommandDAO.merge(command);
+    }
+    hostRoleCommandDAO.mergeAll(commands);
+
+    if (checkRequest) {
+      endRequestIfCompleted(requestId);
     }
   }
 
@@ -473,7 +547,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   @Override
-  @Transactional
   public List<Request> getRequests(Collection<Long> requestIds){
     List<RequestEntity> requestEntities = requestDAO.findByPks(requestIds);
     List<Request> requests = new ArrayList<Request>(requestEntities.size());

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index 3788d75..789bbd6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -33,6 +33,7 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -123,15 +124,13 @@ public class ActionManager {
     if (reports == null) {
       return;
     }
+
+    List<CommandReport> reportsToProcess = new ArrayList<CommandReport>();
     //persist the action response into the db.
     for (CommandReport report : reports) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing command report : " + report.toString());
       }
-      String actionId = report.getActionId();
-      long[] requestStageIds = StageUtils.getRequestStage(actionId);
-      long requestId = requestStageIds[0];
-      long stageId = requestStageIds[1];
       HostRoleCommand command = db.getTask(report.getTaskId());
       if (command == null) {
         LOG.warn("The task " + report.getTaskId()
@@ -144,9 +143,10 @@ public class ActionManager {
             + " is not in progress, ignoring update");
         continue;
       }
-      db.updateHostRoleState(hostname, requestId, stageId, report.getRole(),
-          report);
+      reportsToProcess.add(report);
     }
+
+    db.updateHostRoleStates(reportsToProcess);
   }
 
   public void handleLostHost(String host) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 259b970..601930d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -79,6 +79,8 @@ class ActionScheduler implements Runnable {
   private final ServerActionManager serverActionManager;
   private final Configuration configuration;
 
+  private final Set<String> requestsInProgress = new HashSet<String>();
+
   /**
    * true if scheduler should run ASAP.
    * We need this flag to avoid sleep in situations, when
@@ -145,8 +147,10 @@ class ActionScheduler implements Runnable {
         shouldRun = false;
       } catch (Exception ex) {
         LOG.warn("Exception received", ex);
+        requestsInProgress.clear();
       } catch (Throwable t) {
         LOG.warn("ERROR", t);
+        requestsInProgress.clear();
       }
     }
   }
@@ -179,9 +183,12 @@ class ActionScheduler implements Runnable {
           continue;
         } else {
           runningRequestIds.add(requestIdStr);
+          if (!requestsInProgress.contains(requestIdStr)) {
+            requestsInProgress.add(requestIdStr);
+            db.startRequest(requestId);
+          }
         }
 
-
         List<String> stageHosts = s.getHosts();
         boolean conflict = false;
         for (String host : stageHosts) {
@@ -251,6 +258,8 @@ class ActionScheduler implements Runnable {
         }
       }
 
+      requestsInProgress.retainAll(runningRequestIds);
+
     } finally {
       unitOfWork.end();
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
index 84b8f93..039579f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
@@ -17,6 +17,10 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 public enum HostRoleStatus {
   PENDING(0), //Not queued for a host
   QUEUED(1), //Queued for a host
@@ -27,6 +31,10 @@ public enum HostRoleStatus {
   ABORTED(6); //Operation was abandoned
   private final int status;
 
+  private static List<HostRoleStatus> COMPLETED_STATES = Arrays.asList(FAILED, TIMEDOUT,
ABORTED, COMPLETED);
+  private static List<HostRoleStatus> FAILED_STATES = Arrays.asList(FAILED, TIMEDOUT,
ABORTED);
+
+
   private HostRoleStatus(int status) {
     this.status = status;
   }
@@ -37,14 +45,7 @@ public enum HostRoleStatus {
    * @return true if this is a valid failure state.
    */
   public boolean isFailedState() {
-    switch (HostRoleStatus.values()[this.status]) {
-      case FAILED:
-      case TIMEDOUT:
-      case ABORTED:
-        return true;
-      default:
-        return false;
-    }
+    return FAILED_STATES.contains(this);
   }
 
   /**
@@ -56,14 +57,24 @@ public enum HostRoleStatus {
    * @return true if this is a completed state.
    */
   public boolean isCompletedState() {
-    switch (HostRoleStatus.values()[this.status]) {
-      case COMPLETED:
-      case FAILED:
-      case TIMEDOUT:
-      case ABORTED:
-        return true;
-      default:
-        return false;
-    }
+    return COMPLETED_STATES.contains(this);
+  }
+
+  /**
+   *
+   * @return list of completed states
+   */
+  public static List<HostRoleStatus> getCompletedStates() {
+    return Collections.unmodifiableList(COMPLETED_STATES);
   }
+
+  /**
+   *
+   * @return list of failed states
+   */
+  public static List<HostRoleStatus> getFailedStates() {
+    return Collections.unmodifiableList(FAILED_STATES);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
index 5116ea9..d1047a7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -45,6 +45,7 @@ public class Request {
   private long createTime;
   private long startTime;
   private long endTime;
+  private HostRoleStatus status; // not persisted yet
   private String inputs;
   private String targetService;
   private String targetComponent;
@@ -142,6 +143,7 @@ public class Request {
     this.targetHosts = entity.getTargetHosts();
     this.requestType = entity.getRequestType();
     this.commandName = entity.getCommandName();
+    this.status = entity.getStatus();
     if (entity.getRequestScheduleEntity() !=null) {
       this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId();
     }
@@ -303,4 +305,12 @@ public class Request {
         ", stages=" + stages +
         '}';
   }
+
+  public HostRoleStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(HostRoleStatus status) {
+    this.status = status;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index e68b974..61e2fc2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -22,7 +22,6 @@ import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.Role;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
@@ -31,10 +30,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.persistence.EntityManager;
-import javax.persistence.Query;
 import javax.persistence.TypedQuery;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 @Singleton
 public class HostRoleCommandDAO {
@@ -53,10 +55,13 @@ public class HostRoleCommandDAO {
 
   @Transactional
   public List<HostRoleCommandEntity> findByPKs(Collection<Long> taskIds) {
+    if (taskIds == null || taskIds.isEmpty()) {
+      return Collections.emptyList();
+    }
     TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery(
-        "SELECT task FROM HostRoleCommandEntity task WHERE task.taskId IN ?1 " +
-            "ORDER BY task.taskId",
-        HostRoleCommandEntity.class);
+      "SELECT task FROM HostRoleCommandEntity task WHERE task.taskId IN ?1 " +
+        "ORDER BY task.taskId",
+      HostRoleCommandEntity.class);
     return daoUtils.selectList(query, taskIds);
   }
 
@@ -204,7 +209,15 @@ public class HostRoleCommandDAO {
   public HostRoleCommandEntity merge(HostRoleCommandEntity stageEntity) {
     HostRoleCommandEntity entity = entityManagerProvider.get().merge(stageEntity);
     return entity;
+  }
 
+  @Transactional
+  public List<HostRoleCommandEntity> mergeAll(Collection<HostRoleCommandEntity>
entities) {
+    List<HostRoleCommandEntity> managedList = new ArrayList<HostRoleCommandEntity>(entities.size());
+    for (HostRoleCommandEntity entity : entities) {
+      managedList.add(entityManagerProvider.get().merge(entity));
+    }
+    return managedList;
   }
 
   @Transactional

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 483550f..7a2c836 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -21,6 +21,7 @@ package org.apache.ambari.server.orm.dao;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.persist.Transactional;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.orm.entities.RequestEntity;
 
 import javax.persistence.EntityManager;
@@ -47,6 +48,25 @@ public class RequestDAO {
   }
 
   @Transactional
+  public boolean isAllTasksCompleted(long requestId) {
+    TypedQuery<Long> query = entityManagerProvider.get().createQuery(
+        "SELECT task.taskId FROM HostRoleCommandEntity task WHERE task.requestId = ?1 AND
" +
+          "task.stageId=(select max(stage.stageId) FROM StageEntity stage WHERE stage.requestId=?1)
" +
+          "AND task.status NOT IN ?2",
+        Long.class
+    );
+    query.setMaxResults(1); //we don't need all
+    return daoUtils.selectList(query, requestId, HostRoleStatus.getCompletedStates()).isEmpty();
+  }
+
+  @Transactional
+  public Long getLastStageId(long requestId) {
+    TypedQuery<Long> query = entityManagerProvider.get().createQuery("SELECT max(stage.stageId)
" +
+      "FROM StageEntity stage WHERE stage.requestId=?1", Long.class);
+    return daoUtils.selectSingle(query, requestId);
+  }
+
+  @Transactional
   public void create(RequestEntity requestEntity) {
     entityManagerProvider.get().persist(requestEntity);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
index 1fe763b..072b4ed 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
@@ -18,9 +18,21 @@
 
 package org.apache.ambari.server.orm.entities;
 
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.RequestType;
 
-import javax.persistence.*;
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
 import java.util.Collection;
 
 @Table(name = "request")
@@ -68,7 +80,8 @@ public class RequestEntity {
   private RequestType requestType;
 
   @Column(name = "status")
-  private String status;
+  @Enumerated(value = EnumType.STRING)
+  private HostRoleStatus status;
 
   @Basic
   @Column(name = "create_time", nullable = false)
@@ -205,11 +218,11 @@ public class RequestEntity {
     this.commandName = commandName;
   }
 
-  public String getStatus() {
+  public HostRoleStatus getStatus() {
     return status;
   }
 
-  public void setStatus(String status) {
+  public void setStatus(HostRoleStatus status) {
     this.status = status;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index b749ea0..9cb2199 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -17,21 +17,11 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.createStrictMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
 import com.google.inject.persist.UnitOfWork;
 import junit.framework.Assert;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
@@ -40,7 +30,6 @@ import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
-import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
 import org.apache.ambari.server.utils.StageUtils;
@@ -49,9 +38,12 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.persist.PersistService;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
 
 public class TestActionManager {
 
@@ -119,6 +111,8 @@ public class TestActionManager {
       "STRUCTURED_OUTPUT",
       am.getAction(requestId, stageId)
         .getHostRoleCommand(hostname, "HBASE_MASTER").getStructuredOut());
+
+    assertFalse(db.getRequest(requestId).getEndTime() == -1);
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 696d2a6..de6c98b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -23,9 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 import java.lang.reflect.Type;
 import java.util.ArrayList;
@@ -216,6 +214,9 @@ public class TestActionScheduler {
     assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"),
         HostRoleStatus.TIMEDOUT);
 
+    verify(db, times(1)).startRequest(eq(1L));
+    verify(db, times(1)).abortOperation(1L);
+
     scheduler.stop();
   }
 
@@ -587,21 +588,29 @@ public class TestActionScheduler {
     doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
-        String host = (String) invocation.getArguments()[0];
-        Long requestId = (Long) invocation.getArguments()[1];
-        Long stageId = (Long) invocation.getArguments()[2];
-        String role = (String) invocation.getArguments()[3];
-        CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
-        for (Stage stage : stages) {
-          if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId()))
{
-            HostRoleCommand command = stage.getHostRoleCommand(host, role);
-            command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
+        List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+        for (CommandReport report : reports) {
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          Long requestId = requestStageIds[0];
+          Long stageId = requestStageIds[1];
+          String role = report.getRole();
+          Long id = report.getTaskId();
+          for (Stage stage : stages) {
+            if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId()))
{
+              for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands())
{
+                if (hostRoleCommand.getTaskId() == id) {
+                  hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+                }
+              }
+            }
           }
+
         }
 
         return null;
       }
-    }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
+    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
 
     when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
       @Override
@@ -906,21 +915,29 @@ public class TestActionScheduler {
     doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
-        String host = (String) invocation.getArguments()[0];
-        Long requestId = (Long) invocation.getArguments()[1];
-        Long stageId = (Long) invocation.getArguments()[2];
-        String role = (String) invocation.getArguments()[3];
-        CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
-        for (Stage stage : stages) {
-          if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId()))
{
-            HostRoleCommand command = stage.getHostRoleCommand(host, role);
-            command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
+        List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+        for (CommandReport report : reports) {
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          Long requestId = requestStageIds[0];
+          Long stageId = requestStageIds[1];
+          String role = report.getRole();
+          Long id = report.getTaskId();
+          for (Stage stage : stages) {
+            if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId()))
{
+              for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands())
{
+                if (hostRoleCommand.getTaskId() == id) {
+                  hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+                }
+              }
+            }
           }
+
         }
 
         return null;
       }
-    }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
+    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
 
     when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
       @Override


Mime
View raw message