ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jspei...@apache.org
Subject [2/2] ambari git commit: AMBARI-7985. Add server side command functionality. Allow tasks to be executed on the Ambari Server host.
Date Mon, 17 Nov 2014 17:26:03 GMT
AMBARI-7985.  Add server side command functionality.
Allow tasks to be executed on the Ambari Server host.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3d397dc0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3d397dc0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3d397dc0

Branch: refs/heads/trunk
Commit: 3d397dc04aab8d81c00aae2a8e5afa099fc57567
Parents: cc076cf
Author: Robert Levas <rlevas@hortonworks.com>
Authored: Mon Nov 17 12:22:15 2014 -0500
Committer: John Speidel <jspeidel@hortonworks.com>
Committed: Mon Nov 17 12:25:43 2014 -0500

----------------------------------------------------------------------
 .../server/actionmanager/ActionDBAccessor.java  |   5 +
 .../actionmanager/ActionDBAccessorImpl.java     |   7 +-
 .../server/actionmanager/ActionManager.java     |   5 +-
 .../server/actionmanager/ActionScheduler.java   |  97 +---
 .../ambari/server/actionmanager/Stage.java      |  39 +-
 .../server/controller/ControllerModule.java     |   3 -
 .../server/orm/dao/HostRoleCommandDAO.java      |  11 +
 .../serveraction/AbstractServerAction.java      | 138 +++++
 .../server/serveraction/ServerAction.java       |  72 ++-
 .../serveraction/ServerActionExecutor.java      | 525 +++++++++++++++++++
 .../serveraction/ServerActionManager.java       |  32 --
 .../serveraction/ServerActionManagerImpl.java   |  74 ---
 .../server/state/ServiceComponentHostEvent.java |   2 +
 .../state/ServiceComponentHostEventType.java    |   6 +-
 .../ServiceComponentHostServerActionEvent.java  |  76 +++
 .../ServiceComponentHostUpgradeEvent.java       |   3 +-
 .../actionmanager/TestActionDBAccessorImpl.java |  80 ++-
 .../server/actionmanager/TestActionManager.java |   6 +-
 .../actionmanager/TestActionScheduler.java      | 395 +++++++++-----
 .../server/agent/TestHeartbeatHandler.java      |   6 +-
 .../AmbariManagementControllerTest.java         |  25 +-
 .../server/serveraction/MockServerAction.java   |  92 ++++
 .../serveraction/ServerActionExecutorTest.java  | 248 +++++++++
 23 files changed, 1581 insertions(+), 366 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 1f99b4a..6ff365b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -149,6 +149,11 @@ public interface ActionDBAccessor {
   public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
 
   /**
+   * Get a List of host role commands where the role and status are as specified
+   */
+  public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, String role, HostRoleStatus status);
+
+  /**
    * Get all stages that contain tasks with specified host role statuses
    */
   public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 5e879cc..e8be3cc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -212,7 +212,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (clusterEntity != null) {
       clusterId = clusterEntity.getClusterId();
     }
-    
+
     requestEntity.setClusterId(clusterId);
     requestDAO.create(requestEntity);
 
@@ -550,6 +550,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   @Override
+  public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) {
+    return getTasks(hostRoleCommandDAO.findTaskIdsByHostRoleAndStatus(hostname, role, status));
+  }
+
+  @Override
   public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
     List<Stage> stages = new ArrayList<Stage>();
     for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index e2fad5f..6d1d87f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -28,7 +28,6 @@ import org.apache.ambari.server.api.services.BaseRequest;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
@@ -60,12 +59,12 @@ public class ActionManager {
   public ActionManager(@Named("schedulerSleeptime") long schedulerSleepTime,
                        @Named("actionTimeout") long actionTimeout,
                        ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
-                       ServerActionManager serverActionManager, UnitOfWork unitOfWork,
+                       UnitOfWork unitOfWork,
                        RequestFactory requestFactory, Configuration configuration) {
     this.actionQueue = aq;
     this.db = db;
     scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
-        actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration);
+        actionQueue, fsm, 2, hostsMap, unitOfWork, configuration);
     requestCounter = new AtomicLong(
         db.getLastPersistedRequestIdWhenInitialized());
     this.requestFactory = requestFactory;

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 81fee75..c23440e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -43,8 +43,7 @@ import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.serveraction.ServerAction;
-import org.apache.ambari.server.serveraction.ServerActionManager;
+import org.apache.ambari.server.serveraction.ServerActionExecutor;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
@@ -93,7 +92,7 @@ class ActionScheduler implements Runnable {
   private boolean taskTimeoutAdjustment = true;
   private final HostsMap hostsMap;
   private final Object wakeupSyncObject = new Object();
-  private final ServerActionManager serverActionManager;
+  private final ServerActionExecutor serverActionExecutor;
   private final Configuration configuration;
 
   private final Set<Long> requestsInProgress = new HashSet<Long>();
@@ -126,7 +125,7 @@ class ActionScheduler implements Runnable {
 
   public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
       ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
-      int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager,
+      int maxAttempts, HostsMap hostsMap,
       UnitOfWork unitOfWork, Configuration configuration) {
     this.sleepTime = sleepTimeMilliSec;
     this.hostsMap = hostsMap;
@@ -135,7 +134,7 @@ class ActionScheduler implements Runnable {
     this.actionQueue = actionQueue;
     this.fsmObject = fsmObject;
     this.maxAttempts = (short) maxAttempts;
-    this.serverActionManager = serverActionManager;
+    this.serverActionExecutor = new ServerActionExecutor(db, sleepTimeMilliSec);
     this.unitOfWork = unitOfWork;
     this.clusterHostInfoCache = CacheBuilder.newBuilder().
         expireAfterAccess(5, TimeUnit.MINUTES).
@@ -152,11 +151,19 @@ class ActionScheduler implements Runnable {
   public void start() {
     schedulerThread = new Thread(this);
     schedulerThread.start();
+
+    // Start up the ServerActionExecutor. Since it is directly related to the ActionScheduler it
+    // should be started and stopped along with it.
+    serverActionExecutor.start();
   }
 
   public void stop() {
     shouldRun = false;
     schedulerThread.interrupt();
+
+    // Stop the ServerActionExecutor. Since it is directly related to the ActionScheduler it should
+    // be started and stopped along with it.
+    serverActionExecutor.stop();
   }
 
   /**
@@ -216,7 +223,7 @@ class ActionScheduler implements Runnable {
         return;
       }
       int i_stage = 0;
-      
+
       stages = filterParallelPerHostStages(stages);
 
       boolean exclusiveRequestIsGoing = false;
@@ -285,18 +292,7 @@ class ActionScheduler implements Runnable {
         //Schedule what we have so far
 
         for (ExecutionCommand cmd : commandsToSchedule) {
-          if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) {
-            /**
-             * We don't forbid executing any stages in parallel with
-             * AMBARI_SERVER_ACTION. That  should be OK as AMBARI_SERVER_ACTION
-             * is not used as of now. The general motivation has been to update
-             * Request status when last task associated with the
-             * Request is finished.
-             */
-            executeServerAction(s, cmd);
-          } else {
             processHostRole(s, cmd, commandsToStart, commandsToUpdate);
-          }
         }
 
         LOG.debug("==> Commands to start: {}", commandsToStart.size());
@@ -347,7 +343,12 @@ class ActionScheduler implements Runnable {
 
         LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size());
         for (ExecutionCommand cmd : commandsToUpdate) {
-          actionQueue.enqueue(cmd.getHostname(), cmd);
+          // Do not queue up server actions; however if we encounter one, wake up the ServerActionExecutor
+          if (Role.AMBARI_SERVER_ACTION.toString().equals(cmd.getRole())) {
+            serverActionExecutor.awake();
+          } else {
+            actionQueue.enqueue(cmd.getHostname(), cmd);
+          }
         }
         LOG.debug("==> Finished.");
 
@@ -403,32 +404,6 @@ class ActionScheduler implements Runnable {
     return true;
   }
 
-  /**
-   * Executes internal ambari-server action
-   */
-  private void executeServerAction(Stage s, ExecutionCommand cmd) {
-    try {
-      LOG.trace("Executing server action: request_id={}, stage_id={}, task_id={}",
-        s.getRequestId(), s.getStageId(), cmd.getTaskId());
-      long now = System.currentTimeMillis();
-      String hostName = cmd.getHostname();
-      String roleName = cmd.getRole();
-
-      s.setStartTime(hostName, roleName, now);
-      s.setLastAttemptTime(hostName, roleName, now);
-      s.incrementAttemptCount(hostName, roleName);
-      s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
-      db.hostRoleScheduled(s, hostName, roleName);
-      String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
-      this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
-      reportServerActionSuccess(s, cmd);
-
-    } catch (AmbariException e) {
-      LOG.warn("Could not execute server action " + cmd.toString(), e);
-      reportServerActionFailure(s, cmd, e.getMessage());
-    }
-  }
-
   private boolean hasPreviousStageFailed(Stage stage) {
     boolean failed = false;
     long prevStageId = stage.getStageId() - 1;
@@ -477,26 +452,6 @@ class ActionScheduler implements Runnable {
     return failed;
   }
 
-  private void reportServerActionSuccess(Stage stage, ExecutionCommand cmd) {
-    CommandReport report = new CommandReport();
-    report.setStatus(HostRoleStatus.COMPLETED.toString());
-    report.setExitCode(0);
-    report.setStdOut("Server action succeeded");
-    report.setStdErr("");
-    db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
-            cmd.getRole(), report);
-  }
-
-  private void reportServerActionFailure(Stage stage, ExecutionCommand cmd, String message) {
-    CommandReport report = new CommandReport();
-    report.setStatus(HostRoleStatus.FAILED.toString());
-    report.setExitCode(1);
-    report.setStdOut("Server action failed");
-    report.setStdErr(message);
-    db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
-            cmd.getRole(), report);
-  }
-
   /**
    * @return Stats for the roles in the stage. It is used to determine whether stage
    * has succeeded or failed.
@@ -569,12 +524,12 @@ class ActionScheduler implements Runnable {
 
         // Check that service host component is not deleted
         if (hostDeleted) {
-          
+
           String message = String.format(
             "Host not found when trying to schedule an execution command. " +
             "The most probable reason for that is that host or host component " +
             "has been deleted recently. The command has been aborted and dequeued." +
-            "Execution command details: " + 
+            "Execution command details: " +
             "cmdId: %s; taskId: %s; roleCommand: %s",
             c.getCommandId(), c.getTaskId(), c.getRoleCommand());
           LOG.warn("Host {} has been detected as non-available. {}", host, message);
@@ -772,7 +727,7 @@ class ActionScheduler implements Runnable {
     }
 
     cmd.setClusterHostInfo(clusterHostInfo);
- 
+
     //Try to get commandParams from cache and merge them with command-level parameters
     Map<String, String> commandParams = commandParamsStageCache.getIfPresent(stagePk);
 
@@ -888,12 +843,16 @@ class ActionScheduler implements Runnable {
       LOG.error("Unknown status " + status.name());
     }
   }
-  
-  
+
+
   public void setTaskTimeoutAdjustment(boolean val) {
     this.taskTimeoutAdjustment = val;
   }
 
+  ServerActionExecutor getServerActionExecutor() {
+    return serverActionExecutor;
+  }
+
   static class RoleStats {
     int numInProgress;
     int numQueued = 0;

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index bbc5ac3..01f2085 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -39,7 +40,7 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -302,14 +303,38 @@ public class Stage {
 
 
   /**
-   *  Creates server-side execution command. As of now, it seems to
-   *  be used only for server upgrade
+   * Creates server-side execution command.
+   * <p/>
+   * The action name for this command is expected to be the classname of a
+   * {@link org.apache.ambari.server.serveraction.ServerAction} implementation which will be
+   * instantiated and invoked as needed.
+   *
+   * @param actionName    a String declaring the action name (in the form of a classname) to execute
+   * @param role          the Role for this command
+   * @param command       the RoleCommand for this command
+   * @param clusterName   a String identifying the cluster on which to to execute this command
+   * @param event         a ServiceComponentHostServerActionEvent
+   * @param commandParams a Map of String to String data used to pass to the action - this may be
+   *                      empty or null if no data is relevant
+   * @param timeout       an Integer declaring the timeout for this action - if null, a default
+   *                      timeout will be used
    */
-  public synchronized void addServerActionCommand(String actionName, Role role,  RoleCommand command, String clusterName,
-      ServiceComponentHostUpgradeEvent event, String hostName) {
-    ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, role, command, event);
+  public synchronized void addServerActionCommand(String actionName, Role role, RoleCommand command,
+                                                  String clusterName, ServiceComponentHostServerActionEvent event,
+                                                  @Nullable Map<String, String> commandParams,
+                                                  @Nullable Integer timeout) {
+    ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, StageUtils.getHostName(), role, command, event);
     ExecutionCommand cmd = commandWrapper.getExecutionCommand();
-    
+
+    Map<String, String> cmdParams = new HashMap<String, String>();
+    if (commandParams != null) {
+      cmdParams.putAll(commandParams);
+    }
+    if (timeout != null) {
+      cmdParams.put(ExecutionCommand.KeyNames.COMMAND_TIMEOUT, NumberFormat.getIntegerInstance().format(timeout));
+    }
+    cmd.setCommandParams(cmdParams);
+
     Map<String, String> roleParams = new HashMap<String, String>();
     roleParams.put(ServerAction.ACTION_NAME, actionName);
     cmd.setRoleParams(roleParams);

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
index d74510a..2d91462 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
@@ -66,8 +66,6 @@ import org.apache.ambari.server.scheduler.ExecutionScheduler;
 import org.apache.ambari.server.scheduler.ExecutionSchedulerImpl;
 import org.apache.ambari.server.security.SecurityHelper;
 import org.apache.ambari.server.security.SecurityHelperImpl;
-import org.apache.ambari.server.serveraction.ServerActionManager;
-import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
@@ -238,7 +236,6 @@ public class ControllerModule extends AbstractModule {
     bind(AmbariManagementController.class)
         .to(AmbariManagementControllerImpl.class);
     bind(AbstractRootServiceResponseFactory.class).to(RootServiceResponseFactory.class);
-    bind(ServerActionManager.class).to(ServerActionManagerImpl.class);
     bind(ExecutionScheduler.class).to(ExecutionSchedulerImpl.class);
     bind(DBAccessor.class).to(DBAccessorImpl.class);
     bind(ViewInstanceHandlerList.class).to(AmbariHandlerList.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 6920a9e..cf025b7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -127,6 +127,17 @@ public class HostRoleCommandDAO {
   }
 
   @RequiresSession
+  public List<Long> findTaskIdsByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) {
+    TypedQuery<Long> query = entityManagerProvider.get().createQuery(
+        "SELECT DISTINCT task.taskId FROM HostRoleCommandEntity task " +
+            "WHERE task.hostName=?1 AND task.role=?2 AND task.status=?3 " +
+            "ORDER BY task.taskId", Long.class
+    );
+
+    return daoUtils.selectList(query, hostname, role, status);
+  }
+
+  @RequiresSession
   public List<HostRoleCommandEntity> findSortedCommandsByStageAndHost(StageEntity stageEntity, HostEntity hostEntity) {
     TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " +
         "FROM HostRoleCommandEntity hostRoleCommand " +

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
new file mode 100644
index 0000000..9882e73
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.utils.StageUtils;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * AbstractServerActionImpl is an abstract implementation of a ServerAction.
+ * <p/>
+ * This abstract implementation provides common facilities for all ServerActions, such as
+ * maintaining the ExecutionCommand and HostRoleCommand properties. It also provides a convenient
+ * way to generate CommandReports for reporting status.
+ */
+public abstract class AbstractServerAction implements ServerAction {
+  /**
+   * The ExecutionCommand containing data related to this ServerAction implementation
+   */
+  private ExecutionCommand executionCommand = null;
+
+  /**
+   * The HostRoleCommand containing data related to this ServerAction implementation
+   */
+  private HostRoleCommand hostRoleCommand = null;
+
+  @Override
+  public ExecutionCommand getExecutionCommand() {
+    return this.executionCommand;
+  }
+
+  @Override
+  public void setExecutionCommand(ExecutionCommand executionCommand) {
+    this.executionCommand = executionCommand;
+  }
+
+  @Override
+  public HostRoleCommand getHostRoleCommand() {
+    return this.hostRoleCommand;
+  }
+
+  @Override
+  public void setHostRoleCommand(HostRoleCommand hostRoleCommand) {
+    this.hostRoleCommand = hostRoleCommand;
+  }
+
+  /**
+   * Creates a CommandReport used to report back to Ambari the status of this ServerAction.
+   *
+   * @param exitCode      an integer value declaring the exit code for this action - 0 typically
+   *                      indicates success.
+   * @param status        a HostRoleStatus indicating the status of this action
+   * @param structuredOut a String containing the (typically) JSON-formatted data representing the
+   *                      output from this action (this data is stored in the database, along with
+   *                      the command status)
+   * @param stdout        A string containing the data from the standard out stream (this data is stored in
+   *                      the database, along with the command status)
+   * @param stderr        A string containing the data from the standard error stream (this data is stored
+   *                      in the database, along with the command status)
+   * @return the generated CommandReport, or null if the HostRoleCommand or ExecutionCommand
+   * properties are missing
+   */
+  protected CommandReport createCommandReport(int exitCode, HostRoleStatus status, String structuredOut,
+                                              String stdout, String stderr) {
+    CommandReport report = null;
+
+    if (hostRoleCommand != null) {
+      if (executionCommand == null) {
+        ExecutionCommandWrapper wrapper = hostRoleCommand.getExecutionCommandWrapper();
+
+        if (wrapper != null) {
+          executionCommand = wrapper.getExecutionCommand();
+        }
+      }
+
+      if (executionCommand != null) {
+        RoleCommand roleCommand = executionCommand.getRoleCommand();
+
+        report = new CommandReport();
+
+        report.setActionId(StageUtils.getActionId(hostRoleCommand.getRequestId(), hostRoleCommand.getStageId()));
+        report.setClusterName(executionCommand.getClusterName());
+        report.setConfigurationTags(executionCommand.getConfigurationTags());
+        report.setRole(executionCommand.getRole());
+        report.setRoleCommand((roleCommand == null) ? null : roleCommand.toString());
+        report.setServiceName(executionCommand.getServiceName());
+        report.setTaskId(executionCommand.getTaskId());
+
+        report.setStructuredOut(structuredOut);
+        report.setStdErr((stderr == null) ? "" : stderr);
+        report.setStdOut((stdout == null) ? "" : stdout);
+        report.setStatus((status == null) ? null : status.toString());
+        report.setExitCode(exitCode);
+      }
+    }
+
+    return report;
+  }
+
+  /**
+   * Returns the command parameters value from the ExecutionCommand
+   * <p/>
+   * The returned map should be assumed to be read-only.
+   *
+   * @return the (assumed read-only) command parameters value from the ExecutionCommand
+   */
+  protected Map<String, String> getCommandParameters() {
+    if (executionCommand == null) {
+      return Collections.emptyMap();
+    } else {
+      return executionCommand.getCommandParams();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java
index be885b5..99e3029 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java
@@ -18,23 +18,67 @@
 
 package org.apache.ambari.server.serveraction;
 
-public class ServerAction {
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * ServerAction is an interface to be implemented by all server-based actions/tasks.
+ */
+public interface ServerAction {
 
   public static final String ACTION_NAME = "ACTION_NAME";
 
+
+  /**
+   * Gets the ExecutionCommand property of this ServerAction.
+   *
+   * @return the ExecutionCommand property of this ServerAction
+   */
+  ExecutionCommand getExecutionCommand();
+
+  /**
+   * Sets the ExecutionCommand property of this ServerAction.
+   * <p/>
+   * This property is expected to be set by the creator of this ServerAction before calling execute.
+   *
+   * @param command the ExecutionCommand data to set
+   */
+  void setExecutionCommand(ExecutionCommand command);
+
+
+  /**
+   * Gets the HostRoleCommand property of this ServerAction.
+   *
+   * @return the HostRoleCommand property of this ServerAction
+   */
+  HostRoleCommand getHostRoleCommand();
+
+  /**
+   * Sets the HostRoleCommand property of this ServerAction.
+   * <p/>
+   * This property is expected to be set by the creator of this ServerAction before calling execute.
+   *
+   * @param hostRoleCommand the HostRoleCommand data to set
+   */
+  void setHostRoleCommand(HostRoleCommand hostRoleCommand);
+
   /**
-   * The commands supported by the server. A command is a named alias to the
-   * action implementation at the server
+   * Executes this ServerAction
+   * <p/>
+   * This is typically called by the ServerActionExecutor in it's own thread, but there is no
+   * guarantee that this is the case.  It is expected that the ExecutionCommand and HostRoleCommand
+   * properties are set before calling this method.
+   *
+   * @param requestSharedDataContext a Map to be used a shared data among all ServerActions related
+   *                                 to a given request
+   * @return a CommandReport declaring the status of the task
+   * @throws AmbariException
+   * @throws InterruptedException
    */
-  public static class Command {
-    /**
-     * Finalize the upgrade request
-     */
-    public static final String FINALIZE_UPGRADE = "FINALIZE_UPGRADE";
-  }
-
-  public static class PayloadName {
-    public final static String CURRENT_STACK_VERSION = "current_stack_version";
-    public final static String CLUSTER_NAME = "cluster_name";
-  }
+  CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext)
+      throws AmbariException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
new file mode 100644
index 0000000..880c596
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.*;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Server Action Executor used to execute server-side actions (or tasks)
+ * <p/>
+ * The ServerActionExecutor executes in its own thread, polling for AMBARI_SERVER_ACTION
+ * HostRoleCommands queued for execution.  It is expected that this thread is managed by the
+ * ActionScheduler such that it is started when the ActionScheduler is started and stopped when the
+ * ActionScheduler is stopped.
+ */
+public class ServerActionExecutor {
+
+  private final static Logger LOG = LoggerFactory.getLogger(ServerActionExecutor.class);
+  private final static Long EXECUTION_TIMEOUT_MS = 1000L * 60 * 5;
+  private final static Long POLLING_TIMEOUT_MS = 1000L * 5;
+
+  /**
+   * Maps request IDs to "blackboards" of shared data.
+   * <p/>
+   * This map is not synchronized, so any access to it should synchronize on
+   * requestSharedDataMap object
+   */
+  private final Map<Long, ConcurrentMap<String, Object>> requestSharedDataMap =
+      new HashMap<Long, ConcurrentMap<String, Object>>();
+
+  /**
+   * The hostname of the (Ambari) server.
+   * <p/>
+   * This hostname is cached so that cycles are spent querying for it more than once.
+   */
+  private final String serverHostName;
+
+  /**
+   * Database accessor to query and update the database of action commands.
+   */
+  private final ActionDBAccessor db;
+
+  /**
+   * Internal locking object used to manage access to activeAwakeRequest.
+   */
+  private final Object wakeupSyncObject = new Object();
+
+  /**
+   * Timeout (in milliseconds) used to throttle polling of database for new action commands.
+   */
+  private final long sleepTimeMS;
+
+  /**
+   * Flag used to help keep thing moving in the event an "awake" request was encountered while busy
+   * handing an action.
+   */
+  private boolean activeAwakeRequest = false;
+
+  /**
+   * A reference to the Thread handling the work for this ServerActionExecutor
+   */
+  private Thread executorThread = null;
+
+  /**
+   * Creates a new ServerActionExecutor
+   *
+   * @param db          the ActionDBAccessor to use to read and update tasks
+   * @param sleepTimeMS the time (in milliseconds) to wait between polling the database for more tasks
+   */
+  public ServerActionExecutor(ActionDBAccessor db, long sleepTimeMS) {
+    this.serverHostName = StageUtils.getHostName();
+    this.db = db;
+    this.sleepTimeMS = (sleepTimeMS < 1) ? POLLING_TIMEOUT_MS : sleepTimeMS;
+  }
+
+  /**
+   * Starts this ServerActionExecutor's main thread.
+   */
+  public void start() {
+    LOG.info("Starting Server Action Executor thread...");
+    executorThread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        while (!Thread.interrupted()) {
+          try {
+            synchronized (wakeupSyncObject) {
+              if (!activeAwakeRequest) {
+                wakeupSyncObject.wait(sleepTimeMS);
+              }
+              activeAwakeRequest = false;
+            }
+
+            doWork();
+          } catch (InterruptedException e) {
+            LOG.warn("Server Action Executor thread interrupted, starting to shutdown...");
+            break;
+          }
+        }
+
+        LOG.info("Server Action Executor thread shutting down...");
+      }
+    }, "Server Action Executor");
+    executorThread.start();
+
+    if (executorThread.isAlive()) {
+      LOG.info("Server Action Executor thread started.");
+    }
+  }
+
+  /**
+   * Attempts to stop this ServerActionExecutor's main thread.
+   */
+  public void stop() {
+    LOG.info("Stopping Server Action Executor thread...");
+
+    if (executorThread != null) {
+      executorThread.interrupt();
+
+      // Wait for about 60 seconds for the thread to stop
+      for (int i = 0; i < 120; i++) {
+        try {
+          executorThread.join(500);
+        } catch (InterruptedException e) {
+          // Ignore this...
+        }
+
+        if (!executorThread.isAlive()) {
+          break;
+        }
+      }
+
+      if (!executorThread.isAlive()) {
+        executorThread = null;
+      }
+    }
+
+    if (executorThread == null) {
+      LOG.info("Server Action Executor thread stopped.");
+    } else {
+      LOG.warn("Server Action Executor thread hasn't stopped, giving up waiting.");
+    }
+  }
+
+  /**
+   * Attempts to force this ServerActionExecutor to wake up and do work.
+   * <p/>
+   * Should be called from another thread when we want scheduler to
+   * make a run ASAP (for example, to process desired configs of SCHs).
+   * The method is guaranteed to return quickly.
+   */
+  public void awake() {
+    synchronized (wakeupSyncObject) {
+      activeAwakeRequest = true;
+      wakeupSyncObject.notify();
+    }
+  }
+
+  /**
+   * Returns a Map to be used to share data among server actions within a given request context.
+   *
+   * @param requestId a long identifying the id of the relevant request
+   * @return a ConcurrentMap of "shared" data
+   */
+  private ConcurrentMap<String, Object> getRequestSharedDataContext(long requestId) {
+    synchronized (requestSharedDataMap) {
+      ConcurrentMap<String, Object> map = requestSharedDataMap.get(requestId);
+
+      if (map == null) {
+        map = new ConcurrentHashMap<String, Object>();
+        requestSharedDataMap.put(requestId, map);
+      }
+
+      return map;
+    }
+  }
+
+  /**
+   * Cleans up orphaned shared data Maps due to completed or failed request contexts.
+   */
+  private void cleanRequestShareDataContexts() {
+    // Clean out any orphaned request shared data contexts
+    for (RequestStatus status : EnumSet.of(RequestStatus.FAILED, RequestStatus.COMPLETED)) {
+      List<Long> requests = db.getRequestsByStatus(status, 100, true);
+
+      if (requests != null) {
+        synchronized (requestSharedDataMap) {
+          for (Long requestId : requests) {
+            requestSharedDataMap.remove(requestId);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * A helper method to create CommandReports indicating the action/task is in progress
+   *
+   * @return a new CommandReport
+   */
+  private CommandReport createInProgressReport() {
+    CommandReport commandReport = new CommandReport();
+    commandReport.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    commandReport.setStdErr("");
+    commandReport.setStdOut("");
+    return commandReport;
+  }
+
+  /**
+   * A helper method to create CommandReports indicating the action/task had timed out
+   *
+   * @return a new CommandReport
+   */
+  private CommandReport createTimedOutReport() {
+    CommandReport commandReport = new CommandReport();
+    commandReport.setStatus(HostRoleStatus.TIMEDOUT.toString());
+    commandReport.setStdErr("");
+    commandReport.setStdOut("");
+    return commandReport;
+  }
+
+  /**
+   * A helper method to create CommandReports indicating the action/task has had an error
+   *
+   * @param message a String containing the error message to report
+   * @return a new CommandReport
+   */
+  private CommandReport createErrorReport(String message) {
+    CommandReport commandReport = new CommandReport();
+    commandReport.setStatus(HostRoleStatus.FAILED.toString());
+    commandReport.setExitCode(1);
+    commandReport.setStdOut("Server action failed");
+    commandReport.setStdErr(message);
+    return commandReport;
+  }
+
+  /**
+   * Stores the status of the task/action
+   * <p/>
+   * If the command report is not specified (null), an error report will be created.
+   *
+   * @param hostRoleCommand  the HostRoleCommand for the relevant task
+   * @param executionCommand the ExecutionCommand for the relevant task
+   * @param commandReport    the CommandReport to store
+   */
+  private void updateHostRoleState(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand,
+                                   CommandReport commandReport) {
+    if (commandReport == null) {
+      commandReport = createErrorReport("Unknown error condition");
+    }
+
+    db.updateHostRoleState(executionCommand.getHostname(), hostRoleCommand.getRequestId(),
+        hostRoleCommand.getStageId(), executionCommand.getRole(), commandReport);
+  }
+
+  /**
+   * Determine what the timeout for this action/task should be.
+   * <p/>
+   * If the timeout value is not set in the command parameter map (under the key
+   * ExecutionCommand.KeyNames.COMMAND_TIMEOUT or "command_timeout", the default timeout value will
+   * be used.  It is expected that the timeout value stored in the command parameter map (if any) is
+   * in seconds.
+   *
+   * @param executionCommand the ExecutionCommand for the relevant task
+   * @return a long declaring the action/task's timeout
+   */
+  private long determineTimeout(ExecutionCommand executionCommand) {
+    Map<String, String> params = executionCommand.getCommandParams();
+    String paramsTimeout = (params == null) ? null : params.get(ExecutionCommand.KeyNames.COMMAND_TIMEOUT);
+    Long timeout;
+
+    try {
+      timeout = (paramsTimeout == null)
+          ? null
+          : (Long.parseLong(paramsTimeout) * 1000); // Convert seconds to milliseconds
+    } catch (NumberFormatException e) {
+      timeout = null;
+    }
+
+    return (timeout == null)
+        ? EXECUTION_TIMEOUT_MS
+        : ((timeout < 0) ? 0 : timeout);
+  }
+
+  /**
+   * Execute the logic to handle each task in the queue in the order in which it was queued.
+   * <p/>
+   * A single task is executed at one time, allowing for a specified (ExecutionCommand.KeyNames.COMMAND_TIMEOUT)
+   * or the default timeout for it to complete before considering the task timed out.
+   *
+   * @throws InterruptedException
+   */
+  public void doWork() throws InterruptedException {
+    List<HostRoleCommand> tasks = db.getTasksByHostRoleAndStatus(serverHostName,
+        Role.AMBARI_SERVER_ACTION.toString(), HostRoleStatus.QUEUED);
+
+    if ((tasks != null) && !tasks.isEmpty()) {
+      for (HostRoleCommand task : tasks) {
+        Long taskId = task.getTaskId();
+
+        LOG.debug("Processing task #{}", taskId);
+
+        if (task.getStatus() == HostRoleStatus.QUEUED) {
+          ExecutionCommandWrapper executionWrapper = task.getExecutionCommandWrapper();
+
+          if (executionWrapper != null) {
+            ExecutionCommand executionCommand = executionWrapper.getExecutionCommand();
+
+            if (executionCommand != null) {
+              // For now, execute only one task at a time. This may change in the future in the
+              // event it is discovered that this is a bottleneck. Since this implementation may
+              // change, it should be noted from outside of this class, that there is no expectation
+              // that tasks will be processed in order or serially.
+              Worker worker = new Worker(task, executionCommand);
+              Thread workerThread = new Thread(worker, String.format("Server Action Executor Worker %s", taskId));
+              Long timeout = determineTimeout(executionCommand);
+
+              updateHostRoleState(task, executionCommand, createInProgressReport());
+
+              LOG.debug("Starting Server Action Executor Worker thread for task #{}.", taskId);
+              workerThread.start();
+
+              try {
+                workerThread.join(timeout);
+              } catch (InterruptedException e) {
+                // Make sure the workerThread is interrupted as well.
+                workerThread.interrupt();
+                throw e;
+              }
+
+              if (workerThread.isAlive()) {
+                LOG.debug("Server Action Executor Worker thread for task #{} timed out - it failed to complete within {} ms.",
+                    taskId, timeout);
+                workerThread.interrupt();
+                updateHostRoleState(task, executionCommand, createTimedOutReport());
+              } else {
+                LOG.debug("Server Action Executor Worker thread for task #{} exited on its own.", taskId);
+                updateHostRoleState(task, executionCommand, worker.getCommandReport());
+              }
+            } else {
+              LOG.warn("Task #{} failed to produce an ExecutionCommand, skipping.", taskId);
+            }
+          } else {
+            LOG.warn("Task #{} failed to produce an ExecutionCommandWrapper, skipping.", taskId);
+          }
+        } else {
+          LOG.warn("Queued task #{} is expected to have a status of {} but has a status of {}, skipping.",
+              taskId, HostRoleStatus.QUEUED, task.getStatus());
+        }
+      }
+    }
+
+    cleanRequestShareDataContexts();
+  }
+
+  /**
+   * Internal class to execute a unit of work in its own thread
+   */
+  private class Worker implements Runnable {
+    /**
+     * The task id of the relevant task
+     */
+    private final Long taskId;
+
+    /**
+     * The HostRoleCommand data used by this Worker to execute the task
+     */
+    private final HostRoleCommand hostRoleCommand;
+
+    /**
+     * The ExecutionCommand data used by this Worker to execute the task
+     */
+    private final ExecutionCommand executionCommand;
+
+    /**
+     * The resulting CommandReport used by the caller to update the status of the relevant task
+     */
+    private CommandReport commandReport = null;
+
+    @Override
+    public void run() {
+      try {
+        LOG.debug("Executing task #{}", taskId);
+
+        commandReport = execute(hostRoleCommand, executionCommand);
+
+        LOG.debug("Task #{} completed execution with status of {}",
+            taskId, (commandReport == null) ? "UNKNOWN" : commandReport.getStatus());
+      } catch (Throwable t) {
+        LOG.warn("Task #{} failed to complete execution due to thrown exception: {}:{}",
+            taskId, t.getClass().getName(), t.getLocalizedMessage());
+
+        commandReport = createErrorReport(t.getLocalizedMessage());
+      }
+    }
+
+    /**
+     * Returns the resulting CommandReport
+     *
+     * @return a CommandReport
+     */
+    public CommandReport getCommandReport() {
+      return commandReport;
+    }
+
+    /**
+     * Attempts to execute the task specified using data from the supplied HostRoleCommand and
+     * ExecutionCommand.
+     * <p/>
+     * Retrieves the role parameters from the supplied ExecutionCommand and queries it for the
+     * "ACTON_NAME" property.  The returned String is expected to be the classname of a ServerAction
+     * implementation.  If so, an instance of the implementation class is created and executed
+     * yielding a CommandReport to (eventually) return back to the parent thread.
+     *
+     * @param hostRoleCommand  The HostRoleCommand the HostRoleCommand for the relevant task
+     * @param executionCommand the ExecutionCommand for the relevant task
+     * @return the resulting CommandReport
+     * @throws AmbariException
+     * @throws InterruptedException
+     */
+    private CommandReport execute(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand)
+        throws AmbariException, InterruptedException {
+
+      if (hostRoleCommand == null) {
+        throw new AmbariException("Missing HostRoleCommand data");
+      } else if (executionCommand == null) {
+        throw new AmbariException("Missing ExecutionCommand data");
+      } else {
+        Map<String, String> roleParams = executionCommand.getRoleParams();
+
+        if (roleParams == null) {
+          throw new AmbariException("Missing RoleParams data");
+        } else {
+          String actionClassname = roleParams.get(ServerAction.ACTION_NAME);
+
+          if (actionClassname == null) {
+            throw new AmbariException("Missing action classname for server action");
+          } else {
+            ServerAction action = createServerAction(actionClassname);
+
+            if (action == null) {
+              throw new AmbariException("Failed to create server action: " + actionClassname);
+            } else {
+              // Set properties on the action:
+              action.setExecutionCommand(executionCommand);
+              action.setHostRoleCommand(hostRoleCommand);
+
+              return action.execute(getRequestSharedDataContext(hostRoleCommand.getRequestId()));
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * Attempts to create an instance of the ServerAction class implementation specified in
+     * classname.
+     *
+     * @param classname a String declaring the classname of the ServerAction class to instantiate
+     * @return the instantiated ServerAction implementation
+     * @throws AmbariException
+     */
+    private ServerAction createServerAction(String classname) throws AmbariException {
+      try {
+        Class<?> actionClass = Class.forName(classname);
+
+        if (actionClass == null) {
+          throw new AmbariException("Unable to load server action class: " + classname);
+        } else {
+          Class<? extends ServerAction> serverActionClass = actionClass.asSubclass(ServerAction.class);
+
+          if (serverActionClass == null) {
+            throw new AmbariException("Unable to execute server action class, invalid type: " + classname);
+          } else {
+            return serverActionClass.newInstance();
+          }
+        }
+      } catch (ClassNotFoundException e) {
+        throw new AmbariException("Unable to load server action class: " + classname, e);
+      } catch (InstantiationException e) {
+        throw new AmbariException("Unable to create an instance of the server action class: " + classname, e);
+      } catch (IllegalAccessException e) {
+        throw new AmbariException("Unable to create an instance of the server action class: " + classname, e);
+      }
+    }
+
+    /**
+     * Constructs a new Worker used to execute a task
+     *
+     * @param hostRoleCommand  the HostRoleCommand for the relevant task
+     * @param executionCommand the ExecutionCommand for the relevant task
+     */
+    private Worker(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand) {
+      this.taskId = hostRoleCommand.getTaskId();
+      this.hostRoleCommand = hostRoleCommand;
+      this.executionCommand = executionCommand;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java
deleted file mode 100644
index 011cf06..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.serveraction;
-
-import org.apache.ambari.server.AmbariException;
-
-import java.util.Map;
-
-/**
- * Server action manager interface.
- */
-public interface ServerActionManager {
-
-  public void executeAction(String actionName, Map<String, String> payload)
-      throws AmbariException;
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java
deleted file mode 100644
index 3a16c77..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.serveraction;
-
-import com.google.inject.Singleton;
-import com.google.inject.Inject;
-import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.StackId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * Server action manager implementation.
- */
-@Singleton
-public class ServerActionManagerImpl implements ServerActionManager {
-
-  private final static Logger LOG =
-      LoggerFactory.getLogger(ServerActionManagerImpl.class);
-
-  private Clusters clusters;
-
-  @Inject
-  public ServerActionManagerImpl(Clusters clusters) {
-    this.clusters = clusters;
-  }
-
-  @Override
-  public void executeAction(String actionName, Map<String, String> payload)
-      throws AmbariException {
-    LOG.info("Executing server action : "
-        + actionName + " with payload "
-        + payload);
-
-    if (actionName.equals(ServerAction.Command.FINALIZE_UPGRADE)) {
-      updateClusterStackVersion(payload);
-    } else {
-      throw new AmbariException("Unsupported action " + actionName);
-    }
-  }
-
-  private void updateClusterStackVersion(Map<String, String> payload) throws AmbariException {
-    if (payload == null
-        || !payload.containsKey(ServerAction.PayloadName.CLUSTER_NAME)
-        || !payload.containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION)) {
-      throw new AmbariException("Invalid payload.");
-    }
-
-    StackId currentStackId = new StackId(payload.get(ServerAction.PayloadName.CURRENT_STACK_VERSION));
-    final Cluster cluster = clusters.getCluster(payload.get(ServerAction.PayloadName.CLUSTER_NAME));
-    cluster.setCurrentStackVersion(currentStackId);
-    cluster.refresh();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
index 78590fc..f624f74 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
@@ -122,6 +122,8 @@ public abstract class ServiceComponentHostEvent
         return new ServiceComponentHostDisableEvent(serviceComponentName, hostName, opTimestamp);
       case HOST_SVCCOMP_RESTORE:
         return new ServiceComponentHostRestoreEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_SERVER_ACTION:
+        return new ServiceComponentHostServerActionEvent(serviceComponentName, hostName, opTimestamp);
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
index b43ac9c..e744a7e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
@@ -74,6 +74,10 @@ public enum ServiceComponentHostEventType {
   /**
    * Recovering host component from disable state
    */
-  HOST_SVCCOMP_RESTORE
+  HOST_SVCCOMP_RESTORE,
+  /**
+   * Triggering a server-side action
+   */
+  HOST_SVCCOMP_SERVER_ACTION
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java
new file mode 100644
index 0000000..c6c8b25
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.svccomphost;
+
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.ServiceComponentHostEventType;
+
+/**
+ * Base class for all events that represent server-side actions.
+ */
+public class ServiceComponentHostServerActionEvent extends
+    ServiceComponentHostEvent {
+
+  /**
+   * Constructs a new ServiceComponentHostServerActionEvent.
+   * <p/>
+   * This method is expected to be called by ether a ServiceComponentHostServerActionEvent or a
+   * class that extends it.
+   *
+   * @param type                 the ServiceComponentHostEventType - expected to be
+   *                             ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION
+   * @param serviceComponentName a String declaring the component for which this action is to be
+   *                             routed - expected to be "AMBARI_SERVER"
+   * @param hostName             a String declaring the host on which the action should be executed -
+   *                             expected to be the hostname of the Ambari server
+   * @param opTimestamp          the time in which this event was created
+   * @param stackId              the relevant stackid
+   */
+  protected ServiceComponentHostServerActionEvent(ServiceComponentHostEventType type,
+                                                  String serviceComponentName, String hostName,
+                                                  long opTimestamp, String stackId) {
+    super(type, serviceComponentName, hostName, opTimestamp, stackId);
+  }
+
+  /**
+   * Constructs a new ServiceComponentHostServerActionEvent where the component name is set to
+   * "AMBARI_SERVER" and the type is set to ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION.
+   *
+   * @param hostName    a String declaring the host on which the action should be executed -
+   *                    expected to be the hostname of the Ambari server
+   * @param opTimestamp the time in which this event was created
+   */
+  public ServiceComponentHostServerActionEvent(String hostName, long opTimestamp) {
+    this("AMBARI_SERVER", hostName, opTimestamp);
+  }
+
+  /**
+   * Constructs a new ServiceComponentHostServerActionEvent
+   *
+   * @param serviceComponentName a String declaring the name of component
+   * @param hostName             a String declaring the host on which the action should be executed -
+   *                             expected to be the hostname of the Ambari server
+   * @param opTimestamp          the time in which this event was created
+   */
+  public ServiceComponentHostServerActionEvent(String serviceComponentName, String hostName,
+                                               long opTimestamp) {
+    this(ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION, serviceComponentName, hostName,
+        opTimestamp, "");
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java
index 8b375fe..dd6817d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java
@@ -18,11 +18,10 @@
 
 package org.apache.ambari.server.state.svccomphost;
 
-import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.ServiceComponentHostEventType;
 
 public class ServiceComponentHostUpgradeEvent extends
-    ServiceComponentHostEvent {
+    ServiceComponentHostServerActionEvent {
 
 
   public ServiceComponentHostUpgradeEvent(String serviceComponentName,

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 36acbc2..3da931f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -43,6 +43,7 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.serveraction.MockServerAction;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
 import org.apache.ambari.server.utils.StageUtils;
@@ -52,6 +53,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import static org.junit.Assert.assertEquals;
@@ -67,6 +69,10 @@ public class TestActionDBAccessorImpl {
   private String hostName = "host1";
   private String clusterName = "cluster1";
   private String actionName = "validate_kerberos";
+
+  private String serverHostName = StageUtils.getHostName(); // "_localhost_";
+  private String serverActionName = MockServerAction.class.getName();
+
   private Injector injector;
   ActionDBAccessor db;
   ActionManager am;
@@ -85,13 +91,18 @@ public class TestActionDBAccessorImpl {
       .with(new TestActionDBAccessorModule()));
     injector.getInstance(GuiceJpaInitializer.class);
     injector.injectMembers(this);
+
+    // Add this host's name since it is needed for server-side actions.
+    clusters.addHost(serverHostName);
+    clusters.getHost(serverHostName).persist();
+
     clusters.addHost(hostName);
     clusters.getHost(hostName).persist();
     clusters.addCluster(clusterName);
     db = injector.getInstance(ActionDBAccessorImpl.class);
 
     am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class),
+        new HostsMap((String) null), injector.getInstance(UnitOfWork.class),
 		injector.getInstance(RequestFactory.class), null);
   }
 
@@ -157,7 +168,7 @@ public class TestActionDBAccessorImpl {
             "(command report status should be ignored)",
             HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
   }
-  
+
   @Test
   public void testGetStagesInProgress() throws AmbariException {
     String hostname = "host1";
@@ -168,7 +179,7 @@ public class TestActionDBAccessorImpl {
     db.persistActions(request);
     assertEquals(2, stages.size());
   }
-  
+
   @Test
   public void testGetStagesInProgressWithFailures() throws AmbariException {
     String hostname = "host1";
@@ -268,6 +279,46 @@ public class TestActionDBAccessorImpl {
   }
 
   @Test
+  public void testServerActionScheduled() throws InterruptedException, AmbariException {
+    populateActionDBWithServerAction(db, serverHostName, requestId, stageId);
+
+    final String roleName = Role.AMBARI_SERVER_ACTION.toString();
+    Stage stage = db.getStage(StageUtils.getActionId(requestId, stageId));
+    assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(serverHostName, roleName));
+    List<HostRoleCommandEntity> entities =
+        hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName);
+
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+    stage.setHostRoleStatus(serverHostName, roleName, HostRoleStatus.QUEUED);
+
+    entities = hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName);
+    assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(serverHostName, roleName));
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+
+    db.hostRoleScheduled(stage, serverHostName, roleName);
+
+    entities = hostRoleCommandDAO.findByHostRole(
+        serverHostName, requestId, stageId, roleName);
+    assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus());
+
+
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        Stage stage1 = db.getStage("23-31");
+        stage1.setHostRoleStatus(serverHostName, roleName, HostRoleStatus.COMPLETED);
+        db.hostRoleScheduled(stage1, serverHostName, roleName);
+      }
+    };
+
+    thread.start();
+    thread.join();
+
+    entities = hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName);
+    assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus());
+  }
+
+  @Test
   public void testUpdateHostRole() throws Exception {
     populateActionDB(db, hostName, requestId, stageId);
     StringBuilder sb = new StringBuilder();
@@ -310,7 +361,7 @@ public class TestActionDBAccessorImpl {
     populateActionDB(db, hostName, requestId + 1, stageId);
     List<Long> requestIdsResult =
       db.getRequestsByStatus(null, BaseRequest.DEFAULT_PAGE_SIZE, false);
-    
+
     assertNotNull("List of request IDs is null", requestIdsResult);
     assertEquals("Request IDs not matches", requestIds, requestIdsResult);
   }
@@ -520,7 +571,26 @@ public class TestActionDBAccessorImpl {
     List<RequestResourceFilter> resourceFilters = new
       ArrayList<RequestResourceFilter>() {{ add(resourceFilter); }};
     ExecuteActionRequest executeActionRequest = new ExecuteActionRequest
-      ("cluster1", null, actionName, resourceFilters, null, null, false);
+        ("cluster1", null, actionName, resourceFilters, null, null, false);
+    Request request = new Request(stages, clusters);
+    db.persistActions(request);
+  }
+
+  private void populateActionDBWithServerAction(ActionDBAccessor db, String hostname,
+                                                long requestId, long stageId) throws AmbariException {
+    Stage s = new Stage(requestId, "/a/b", "cluster1", 1L, "action db accessor test",
+        "", "commandParamsStage", "hostParamsStage");
+    s.setStageId(stageId);
+    s.addServerActionCommand(serverActionName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, clusterName, null, null, 300);
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    final RequestResourceFilter resourceFilter = new RequestResourceFilter("AMBARI", "SERVER", Arrays.asList(hostname));
+    List<RequestResourceFilter> resourceFilters = new
+        ArrayList<RequestResourceFilter>() {{
+          add(resourceFilter);
+        }};
+    ExecuteActionRequest executeActionRequest = new ExecuteActionRequest
+        ("cluster1", null, serverActionName, resourceFilters, null, null, false);
     Request request = new Request(stages, clusters);
     db.persistActions(request);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index 5a2c467..ed1318c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -84,7 +84,7 @@ public class TestActionManager {
   public void testActionResponse() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), null, unitOfWork,
+        clusters, db, new HostsMap((String) null), unitOfWork,
         injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
@@ -127,7 +127,7 @@ public class TestActionManager {
   public void testLargeLogs() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), null, unitOfWork,
+        clusters, db, new HostsMap((String) null), unitOfWork,
         injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
@@ -217,7 +217,7 @@ public class TestActionManager {
 
     replay(queue, db, clusters);
 
-    ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, null, unitOfWork,
+    ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, unitOfWork,
         injector.getInstance(RequestFactory.class), null);
     assertSame(listStages, manager.getActions(requestId));
 


Mime
View raw message