ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nc...@apache.org
Subject [14/16] ambari git commit: AMBARI-20178. Add authentication for Topology tasks (magyari_sandor)
Date Thu, 02 Mar 2017 18:41:59 GMT
AMBARI-20178. Add authentication for Topology tasks (magyari_sandor)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5ab46f0c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5ab46f0c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5ab46f0c

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 5ab46f0cca5cb2dc6250f44ebe2a3603435706fe
Parents: d481b78
Author: Sandor Magyari <smagyari@hortonworks.com>
Authored: Thu Mar 2 15:15:31 2017 +0100
Committer: Sandor Magyari <smagyari@hortonworks.com>
Committed: Thu Mar 2 18:58:01 2017 +0100

----------------------------------------------------------------------
 .../server/controller/ControllerModule.java     |  14 +-
 .../internal/ClusterResourceProvider.java       |   2 +
 .../InternalAuthenticationInterceptor.java      |  51 ++++++
 .../RunWithInternalSecurityContext.java         |  36 ++++
 .../ambari/server/topology/AmbariContext.java   |   7 +-
 .../server/topology/HostOfferResponse.java      |   2 +-
 .../ambari/server/topology/HostRequest.java     | 178 ++-----------------
 .../server/topology/PersistedStateImpl.java     |   1 +
 .../ambari/server/topology/TopologyManager.java | 101 ++---------
 .../ambari/server/topology/TopologyTask.java    |  47 -----
 .../topology/tasks/ConfigureClusterTask.java    | 122 +++++++++++++
 .../tasks/ConfigureClusterTaskFactory.java      |  30 ++++
 .../server/topology/tasks/InstallHostTask.java  |  70 ++++++++
 .../tasks/PersistHostResourcesTask.java         |  59 ++++++
 .../tasks/RegisterWithConfigGroupTask.java      |  50 ++++++
 .../server/topology/tasks/StartHostTask.java    |  67 +++++++
 .../server/topology/tasks/TopologyHostTask.java |  59 ++++++
 .../server/topology/tasks/TopologyTask.java     |  42 +++++
 .../ambari/server/agent/AgentResourceTest.java  |   3 +
 .../server/state/cluster/ClustersTest.java      |   2 +-
 .../ClusterDeployWithStartOnlyTest.java         |   3 +
 ...InstallWithoutStartOnComponentLevelTest.java |   3 +
 .../ClusterInstallWithoutStartTest.java         |   3 +
 .../topology/ConfigureClusterTaskTest.java      |   7 +-
 .../server/topology/TopologyManagerTest.java    |   3 +
 .../ambari/server/utils/StageUtilsTest.java     |   2 +
 26 files changed, 657 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
index 482d602..4fa2362 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
@@ -18,6 +18,8 @@
 
 package org.apache.ambari.server.controller;
 
+import static com.google.inject.matcher.Matchers.annotatedWith;
+import static com.google.inject.matcher.Matchers.any;
 import static org.eclipse.persistence.config.PersistenceUnitProperties.CREATE_JDBC_DDL_FILE;
 import static org.eclipse.persistence.config.PersistenceUnitProperties.CREATE_ONLY;
 import static org.eclipse.persistence.config.PersistenceUnitProperties.CREATE_OR_EXTEND;
@@ -103,6 +105,8 @@ import org.apache.ambari.server.scheduler.ExecutionSchedulerImpl;
 import org.apache.ambari.server.security.SecurityHelper;
 import org.apache.ambari.server.security.SecurityHelperImpl;
 import org.apache.ambari.server.security.authorization.AuthorizationHelper;
+import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationInterceptor;
+import org.apache.ambari.server.security.authorization.internal.RunWithInternalSecurityContext;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.security.encryption.CredentialStoreServiceImpl;
 import org.apache.ambari.server.serveraction.kerberos.KerberosOperationHandlerFactory;
@@ -145,6 +149,7 @@ import org.apache.ambari.server.topology.BlueprintFactory;
 import org.apache.ambari.server.topology.PersistedState;
 import org.apache.ambari.server.topology.PersistedStateImpl;
 import org.apache.ambari.server.topology.SecurityConfigurationFactory;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.apache.ambari.server.view.ViewInstanceHandlerList;
 import org.eclipse.jetty.server.SessionIdManager;
 import org.eclipse.jetty.server.SessionManager;
@@ -396,6 +401,10 @@ public class ControllerModule extends AbstractModule {
     bindNotificationDispatchers(null);
     registerUpgradeChecks(null);
     bind(HookService.class).to(UserHookService.class);
+
+    InternalAuthenticationInterceptor ambariAuthenticationInterceptor = new InternalAuthenticationInterceptor();
+    requestInjection(ambariAuthenticationInterceptor);
+    bindInterceptor(any(), annotatedWith(RunWithInternalSecurityContext.class), ambariAuthenticationInterceptor);
   }
 
   // ----- helper methods ----------------------------------------------------
@@ -461,8 +470,8 @@ public class ControllerModule extends AbstractModule {
         .build(ResourceProviderFactory.class));
 
     install(new FactoryModuleBuilder().implement(
-        ServiceComponent.class, ServiceComponentImpl.class).build(
-        ServiceComponentFactory.class));
+      ServiceComponent.class, ServiceComponentImpl.class).build(
+      ServiceComponentFactory.class));
     install(new FactoryModuleBuilder().implement(
         ServiceComponentHost.class, ServiceComponentHostImpl.class).build(
         ServiceComponentHostFactory.class));
@@ -492,6 +501,7 @@ public class ControllerModule extends AbstractModule {
     install(new FactoryModuleBuilder().implement(HookContext.class, PostUserCreationHookContext.class).build(HookContextFactory.class));
     install(new FactoryModuleBuilder().implement(CollectionPersisterService.class, CsvFilePersisterService.class).build(CollectionPersisterServiceFactory.class));
 
+    install(new FactoryModuleBuilder().build(ConfigureClusterTaskFactory.class));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java
index 613ab3f..577659d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java
@@ -545,6 +545,8 @@ public class ClusterResourceProvider extends AbstractControllerResourceProvider
       throw new IllegalArgumentException("Topology validation failed: " + e, e);
     } catch (AmbariException e) {
       throw new SystemException("Unknown exception when asking TopologyManager to provision cluster", e);
+    } catch (RuntimeException e) {
+      throw new SystemException("An exception occurred during cluster provisioning: " + e.getMessage(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java
new file mode 100644
index 0000000..e879f07
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.security.authorization.internal;
+
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+
+/**
+ * Allows running a given code within a security context authenticated with InternalAuthenticationToken.
+ * This only works for instances created by Guice. If there's already an Authentication in current security context
+ * that will be restored after calling the annotated method.
+ */
+public class InternalAuthenticationInterceptor implements MethodInterceptor {
+
+  @Override
+  public Object invoke(MethodInvocation invocation) throws Throwable {
+
+    Authentication savedAuthContext = SecurityContextHolder.getContext().getAuthentication();
+    try {
+      RunWithInternalSecurityContext securityAuthContextAnnotation = invocation.getMethod().getAnnotation(RunWithInternalSecurityContext
+        .class);
+      InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken(securityAuthContextAnnotation
+        .token());
+      authenticationToken.setAuthenticated(true);
+      SecurityContextHolder.getContext().setAuthentication(authenticationToken);
+      return invocation.proceed();
+    } finally {
+      SecurityContextHolder.getContext().setAuthentication(savedAuthContext);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java
new file mode 100644
index 0000000..3287d47
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.security.authorization.internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Methods annotated with  will run within an security context authenticated with an InternalAuthenticationToken.
+ */
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.METHOD })
+public @interface RunWithInternalSecurityContext {
+
+  /* internal authentication token */
+  String token();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
index adca3a3..ce36208 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
@@ -36,6 +36,7 @@ import javax.inject.Inject;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ClusterNotFoundException;
+import org.apache.ambari.server.DuplicateResourceException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
@@ -187,7 +188,11 @@ public class AmbariContext {
 
     } catch (AmbariException e) {
       LOG.error("Failed to create Cluster resource: ", e);
-      throw new RuntimeException("Failed to create Cluster resource: " + e, e);
+      if (e.getCause() instanceof DuplicateResourceException) {
+        throw new IllegalArgumentException(e);
+      } else {
+        throw new RuntimeException("Failed to create Cluster resource: " + e, e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
index e040e42..e220c50 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
@@ -22,6 +22,7 @@ package org.apache.ambari.server.topology;
 import java.util.List;
 import java.util.concurrent.Executor;
 
+import org.apache.ambari.server.topology.tasks.TopologyTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +80,6 @@ final class HostOfferResponse {
         public void run() {
           for (TopologyTask task : tasks) {
             LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType());
-            task.init(topology, ambariContext);
             task.run();
           }
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
index a6f677a..9152fd2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
@@ -25,17 +25,13 @@ import static org.apache.ambari.server.controller.internal.ProvisionAction.START
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.api.predicate.InvalidQueryException;
 import org.apache.ambari.server.api.predicate.PredicateCompiler;
-import org.apache.ambari.server.controller.RequestStatusResponse;
-import org.apache.ambari.server.controller.ShortTaskStatus;
 import org.apache.ambari.server.controller.internal.HostResourceProvider;
-import org.apache.ambari.server.controller.internal.ProvisionAction;
 import org.apache.ambari.server.controller.internal.ResourceImpl;
 import org.apache.ambari.server.controller.internal.Stack;
 import org.apache.ambari.server.controller.spi.Predicate;
@@ -45,9 +41,16 @@ import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
 import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
 import org.apache.ambari.server.state.host.HostImpl;
+import org.apache.ambari.server.topology.tasks.InstallHostTask;
+import org.apache.ambari.server.topology.tasks.PersistHostResourcesTask;
+import org.apache.ambari.server.topology.tasks.RegisterWithConfigGroupTask;
+import org.apache.ambari.server.topology.tasks.StartHostTask;
+import org.apache.ambari.server.topology.tasks.TopologyTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
 /**
  * Represents a set of requests to a single host such as install, start, etc.
  */
@@ -183,10 +186,10 @@ public class HostRequest implements Comparable<HostRequest> {
 
   private void createTasks(boolean skipFailure) {
     // high level topology tasks such as INSTALL, START, ...
-    topologyTasks.add(new PersistHostResourcesTask());
-    topologyTasks.add(new RegisterWithConfigGroupTask());
+    topologyTasks.add(new PersistHostResourcesTask(topology, this));
+    topologyTasks.add(new RegisterWithConfigGroupTask(topology, this));
 
-    InstallHostTask installTask = new InstallHostTask(skipFailure);
+    InstallHostTask installTask = new InstallHostTask(topology, this, skipFailure);
     topologyTasks.add(installTask);
     logicalTaskMap.put(installTask, new HashMap<String, Long>());
 
@@ -195,7 +198,7 @@ public class HostRequest implements Comparable<HostRequest> {
 
     StartHostTask startTask = null;
     if (!skipStartTaskCreate) {
-      startTask = new StartHostTask(skipFailure);
+      startTask = new StartHostTask(topology, this, skipFailure);
       topologyTasks.add(startTask);
       logicalTaskMap.put(startTask, new HashMap<String, Long>());
     } else {
@@ -249,16 +252,16 @@ public class HostRequest implements Comparable<HostRequest> {
   }
 
   private void createTasksForReplay(TopologyHostRequestEntity entity) {
-    topologyTasks.add(new PersistHostResourcesTask());
-    topologyTasks.add(new RegisterWithConfigGroupTask());
-    InstallHostTask installTask = new InstallHostTask(skipFailure);
+    topologyTasks.add(new PersistHostResourcesTask(topology, this));
+    topologyTasks.add(new RegisterWithConfigGroupTask(topology, this));
+    InstallHostTask installTask = new InstallHostTask(topology, this, skipFailure);
     topologyTasks.add(installTask);
     logicalTaskMap.put(installTask, new HashMap<String, Long>());
 
     boolean skipStartTaskCreate = topology.getProvisionAction().equals(INSTALL_ONLY);
 
     if (!skipStartTaskCreate) {
-      StartHostTask startTask = new StartHostTask(skipFailure);
+      StartHostTask startTask = new StartHostTask(topology, this, skipFailure);
       topologyTasks.add(startTask);
       logicalTaskMap.put(startTask, new HashMap<String, Long>());
     }
@@ -433,9 +436,8 @@ public class HostRequest implements Comparable<HostRequest> {
   //todo: once we have logical tasks, move tracking of physical tasks there
   public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) {
     physicalTasks.put(logicalTaskId, physicalTaskId);
-
-    topology.getAmbariContext().getPersistedTopologyState().
-        registerPhysicalTask(logicalTaskId, physicalTaskId);
+    topology.getAmbariContext().getPersistedTopologyState().registerPhysicalTask(logicalTaskId, physicalTaskId);
+    getLogicalTask(logicalTaskId).incrementAttemptCount();
   }
 
   private Predicate toPredicate(String predicate) {
@@ -451,152 +453,6 @@ public class HostRequest implements Comparable<HostRequest> {
     return compiledPredicate;
   }
 
-  private class PersistHostResourcesTask implements TopologyTask {
-    private AmbariContext ambariContext;
-
-    @Override
-    public Type getType() {
-      return Type.RESOURCE_CREATION;
-    }
-
-    @Override
-    public void init(ClusterTopology topology, AmbariContext ambariContext) {
-      this.ambariContext = ambariContext;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("HostRequest: Executing RESOURCE_CREATION task for host: {}", hostname);
-      HostGroup group = topology.getBlueprint().getHostGroup(getHostgroupName());
-      Map<String, Collection<String>> serviceComponents = new HashMap<String, Collection<String>>();
-      for (String service : group.getServices()) {
-        serviceComponents.put(service, new HashSet<String> (group.getComponents(service)));
-      }
-      ambariContext.createAmbariHostResources(getClusterId(), getHostName(), serviceComponents);
-    }
-  }
-
-  private class RegisterWithConfigGroupTask implements TopologyTask {
-    private ClusterTopology clusterTopology;
-    private AmbariContext ambariContext;
-
-    @Override
-    public Type getType() {
-      return Type.CONFIGURE;
-    }
-
-    @Override
-    public void init(ClusterTopology topology, AmbariContext ambariContext) {
-      clusterTopology = topology;
-      this.ambariContext = ambariContext;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("HostRequest: Executing CONFIGURE task for host: {}", hostname);
-      ambariContext.registerHostWithConfigGroup(getHostName(), clusterTopology, getHostgroupName());
-    }
-  }
-
-  //todo: extract
-  private class InstallHostTask implements TopologyTask {
-    private ClusterTopology clusterTopology;
-    private final boolean skipFailure;
-
-    public InstallHostTask(boolean skipFailure) {
-      this.skipFailure = skipFailure;
-    }
-
-    @Override
-    public Type getType() {
-      return Type.INSTALL;
-    }
-
-    @Override
-    public void init(ClusterTopology topology, AmbariContext ambariContext) {
-      clusterTopology = topology;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("HostRequest: Executing INSTALL task for host: {}", hostname);
-      boolean skipInstallTaskCreate = topology.getProvisionAction().equals(ProvisionAction.START_ONLY);
-      RequestStatusResponse response = clusterTopology.installHost(hostname, skipInstallTaskCreate, skipFailure);
-      // map logical install tasks to physical install tasks
-      List<ShortTaskStatus> underlyingTasks = response.getTasks();
-      for (ShortTaskStatus task : underlyingTasks) {
-        Long logicalInstallTaskId = logicalTaskMap.get(this).get(task.getRole());
-        if(logicalInstallTaskId == null) {
-          LOG.info("Skipping physical install task registering, because component {} cannot be found", task.getRole());
-          continue;
-        }
-        //todo: for now only one physical task per component
-        long taskId = task.getTaskId();
-        registerPhysicalTaskId(logicalInstallTaskId, taskId);
-
-        //todo: move this to provision
-        //todo: shouldn't have to iterate over all tasks to find install task
-        //todo: we are doing the same thing in the above registerPhysicalTaskId() call
-        // set attempt count on task
-        for (HostRoleCommand logicalTask : logicalTasks.values()) {
-          if (logicalTask.getTaskId() == logicalInstallTaskId) {
-            logicalTask.incrementAttemptCount();
-          }
-        }
-      }
-
-      LOG.info("HostRequest: Exiting INSTALL task for host: {}", hostname);
-    }
-  }
-
-  //todo: extract
-  private class StartHostTask implements TopologyTask {
-    private ClusterTopology clusterTopology;
-    private final boolean skipFailure;
-
-    public StartHostTask(boolean skipFailure) {
-      this.skipFailure = skipFailure;
-    }
-
-    @Override
-    public Type getType() {
-      return Type.START;
-    }
-
-    @Override
-    public void init(ClusterTopology topology, AmbariContext ambariContext) {
-      clusterTopology = topology;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("HostRequest: Executing START task for host: {}", hostname);
-      RequestStatusResponse response = clusterTopology.startHost(hostname, skipFailure);
-      // map logical install tasks to physical install tasks
-      List<ShortTaskStatus> underlyingTasks = response.getTasks();
-      for (ShortTaskStatus task : underlyingTasks) {
-        String component = task.getRole();
-        Long logicalStartTaskId = logicalTaskMap.get(this).get(component);
-        if(logicalStartTaskId == null) {
-          LOG.info("Skipping physical start task registering, because component {} cannot be found", task.getRole());
-          continue;
-        }
-        // for now just set on outer map
-        registerPhysicalTaskId(logicalStartTaskId, task.getTaskId());
-
-        //todo: move this to provision
-        // set attempt count on task
-        for (HostRoleCommand logicalTask : logicalTasks.values()) {
-          if (logicalTask.getTaskId() == logicalStartTaskId) {
-            logicalTask.incrementAttemptCount();
-          }
-        }
-      }
-
-      LOG.info("HostRequest: Exiting START task for host: {}", hostname);
-    }
-  }
-
   private class HostResourceAdapter implements Resource {
     Resource hostResource;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
index be4ab7a..912b2ff 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
@@ -48,6 +48,7 @@ import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
 import org.apache.ambari.server.orm.entities.TopologyRequestEntity;
 import org.apache.ambari.server.stack.NoSuchStackException;
 import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.topology.tasks.TopologyTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index f53f04a..8e991d6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -72,6 +72,8 @@ import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.host.HostImpl;
 import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,6 +89,9 @@ import com.google.inject.persist.Transactional;
 @Singleton
 public class TopologyManager {
 
+  /** internal token for topology related async tasks */
+  public static final String INTERNAL_AUTH_TOKEN = "internal_topology_token";
+
   public static final String INITIAL_CONFIG_TAG = "INITIAL";
   public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
   public static final String KDC_ADMIN_CREDENTIAL = "kdc.admin.credential";
@@ -122,6 +127,9 @@ public class TopologyManager {
   private SecurityConfigurationFactory securityConfigurationFactory;
 
   @Inject
+  private ConfigureClusterTaskFactory configureClusterTaskFactory;
+
+  @Inject
   private AmbariEventPublisher ambariEventPublisher;
 
   @Inject
@@ -302,7 +310,6 @@ public class TopologyManager {
       }
     );
 
-
     clusterTopologyMap.put(clusterId, topology);
 
     addClusterConfigRequest(topology, new ClusterConfigurationRequest(
@@ -1030,101 +1037,15 @@ public class TopologyManager {
       LOG.debug("No timeout constraints found in configuration. Wired defaults will be applied.");
     }
 
-    ConfigureClusterTask configureClusterTask = new ConfigureClusterTask(topology, configurationRequest);
+    ConfigureClusterTask configureClusterTask = configureClusterTaskFactory.createConfigureClusterTask(topology,
+      configurationRequest);
+
     AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay,
         Executors.newScheduledThreadPool(1));
 
     executor.submit(asyncCallableService);
   }
 
-  // package protected for testing purposes
-  static class ConfigureClusterTask implements Callable<Boolean> {
-
-    private ClusterConfigurationRequest configRequest;
-    private ClusterTopology topology;
-
-    public ConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest configRequest) {
-      this.configRequest = configRequest;
-      this.topology = topology;
-    }
-
-    @Override
-    public Boolean call() throws Exception {
-      LOG.info("TopologyManager.ConfigureClusterTask: Entering");
-
-      Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
-
-      if (!areRequiredHostGroupsResolved(requiredHostGroups)) {
-        LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for config request processing not yet " +
-            "satisfied");
-        throw new IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites for config " +
-            "request processing not yet  satisfied");
-      }
-
-      try {
-          LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster " +
-              "Configuration can now begin");
-          configRequest.process();
-        } catch (Exception e) {
-          LOG.error("TopologyManager.ConfigureClusterTask: " +
-              "An exception occurred while attempting to process cluster configs and set on cluster: ", e);
-
-        // this will signal an unsuccessful run, retry will be triggered if required
-        throw new Exception(e);
-      }
-
-      LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
-      return true;
-    }
-
-    /**
-     * Return the set of host group names which are required for configuration topology resolution.
-     *
-     * @return set of required host group names
-     */
-    private Collection<String> getTopologyRequiredHostGroups() {
-      Collection<String> requiredHostGroups;
-      try {
-        requiredHostGroups = configRequest.getRequiredHostGroups();
-      } catch (RuntimeException e) {
-        // just log error and allow config topology update
-        LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required" +
-            " host groups for config update ", e);
-        requiredHostGroups = Collections.emptyList();
-      }
-      return requiredHostGroups;
-    }
-
-    /**
-     * Determine if all hosts for the given set of required host groups are known.
-     *
-     * @param requiredHostGroups set of required host groups
-     * @return true if all required host groups are resolved
-     */
-    private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) {
-      boolean configTopologyResolved = true;
-      Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo();
-      for (String hostGroup : requiredHostGroups) {
-        HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup);
-        if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) {
-          configTopologyResolved = false;
-          if (groupInfo != null) {
-            LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} requires {} hosts to be mapped, but only {} are available.",
-                groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
-          } else {
-              LOG.error("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} is required group and does not map to any hosts. Use add host API to add host to this host group.",
-                  hostGroup);
-          }
-          break;
-        } else {
-          LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} has been fully resolved, as all {} required hosts are mapped to {} physical hosts.",
-              groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
-        }
-      }
-      return configTopologyResolved;
-    }
-  }
-
   /**
    *
    * Removes a host from the available hosts when the host gets deleted.

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java
deleted file mode 100644
index ef39896..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distribut
- * ed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.topology;
-
-/**
- * Task which is executed by the TopologyManager.
- */
-public interface TopologyTask extends Runnable {
-  /**
-   * Task type.
-   */
-  public enum Type {
-    RESOURCE_CREATION,
-    CONFIGURE,
-    INSTALL,
-    START
-  }
-
-  /**
-   * injection of topology and ambari context
-   */
-  public void init(ClusterTopology topology, AmbariContext ambariContext);
-
-  /**
-   * Get the task type.
-   *
-   * @return the type of task
-   */
-  public Type getType();
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
new file mode 100644
index 0000000..19d99ad
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.topology.tasks;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.ambari.server.security.authorization.internal.RunWithInternalSecurityContext;
+import org.apache.ambari.server.topology.ClusterConfigurationRequest;
+import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostGroupInfo;
+import org.apache.ambari.server.topology.TopologyManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+
+public class ConfigureClusterTask implements Callable<Boolean> {
+
+  private static Logger LOG = LoggerFactory.getLogger(ConfigureClusterTask.class);
+
+  private ClusterConfigurationRequest configRequest;
+  private ClusterTopology topology;
+
+  @AssistedInject
+  public ConfigureClusterTask(@Assisted ClusterTopology topology, @Assisted ClusterConfigurationRequest configRequest) {
+    this.configRequest = configRequest;
+    this.topology = topology;
+  }
+
+  @Override
+  @RunWithInternalSecurityContext(token = TopologyManager.INTERNAL_AUTH_TOKEN)
+  public Boolean call() throws Exception {
+    LOG.info("TopologyManager.ConfigureClusterTask: Entering");
+
+    Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
+
+    if (!areRequiredHostGroupsResolved(requiredHostGroups)) {
+      LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for config request processing not yet " +
+        "satisfied");
+      throw new IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites for config " +
+        "request processing not yet  satisfied");
+    }
+
+    try {
+      LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster " +
+        "Configuration can now begin");
+      configRequest.process();
+    } catch (Exception e) {
+      LOG.error("TopologyManager.ConfigureClusterTask: " +
+        "An exception occurred while attempting to process cluster configs and set on cluster: ", e);
+
+      // this will signal an unsuccessful run, retry will be triggered if required
+      throw new Exception(e);
+    }
+
+    LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
+    return true;
+  }
+
+  /**
+   * Return the set of host group names which are required for configuration topology resolution.
+   *
+   * @return set of required host group names
+   */
+  private Collection<String> getTopologyRequiredHostGroups() {
+    Collection<String> requiredHostGroups;
+    try {
+      requiredHostGroups = configRequest.getRequiredHostGroups();
+    } catch (RuntimeException e) {
+      // just log error and allow config topology update
+      LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required" +
+        " host groups for config update ", e);
+      requiredHostGroups = Collections.emptyList();
+    }
+    return requiredHostGroups;
+  }
+
+  /**
+   * Determine if all hosts for the given set of required host groups are known.
+   *
+   * @param requiredHostGroups set of required host groups
+   * @return true if all required host groups are resolved
+   */
+  private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) {
+    boolean configTopologyResolved = true;
+    Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo();
+    for (String hostGroup : requiredHostGroups) {
+      HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup);
+      if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) {
+        configTopologyResolved = false;
+        if (groupInfo != null) {
+          LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} requires {} hosts to be mapped, but only {} are available.",
+            groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
+        }
+        break;
+      } else {
+        LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} has been fully resolved, as all {} required hosts are mapped to {} physical hosts.",
+          groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
+      }
+    }
+    return configTopologyResolved;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java
new file mode 100644
index 0000000..0287103
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.tasks;
+
+import org.apache.ambari.server.topology.ClusterConfigurationRequest;
+import org.apache.ambari.server.topology.ClusterTopology;
+
+
+public interface ConfigureClusterTaskFactory {
+
+   ConfigureClusterTask createConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest
+    configRequest);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java
new file mode 100644
index 0000000..f38022a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.topology.tasks;
+
+import java.util.List;
+
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ShortTaskStatus;
+import org.apache.ambari.server.controller.internal.ProvisionAction;
+import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+
+public class InstallHostTask extends TopologyHostTask {
+
+  private final static Logger LOG = LoggerFactory.getLogger(InstallHostTask.class);
+
+  @AssistedInject
+  public InstallHostTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest, @Assisted boolean skipFailure) {
+    super(topology, hostRequest);
+    this.skipFailure = skipFailure;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.INSTALL;
+  }
+
+  @Override
+  public void runTask() {
+    LOG.info("HostRequest: Executing INSTALL task for host: {}", hostRequest.getHostName());
+    boolean skipInstallTaskCreate = clusterTopology.getProvisionAction().equals(ProvisionAction.START_ONLY);
+    RequestStatusResponse response = clusterTopology.installHost(hostRequest.getHostName(), skipInstallTaskCreate, skipFailure);
+    // map logical install tasks to physical install tasks
+    List<ShortTaskStatus> underlyingTasks = response.getTasks();
+    for (ShortTaskStatus task : underlyingTasks) {
+
+      String component = task.getRole();
+      Long logicalInstallTaskId = hostRequest.getLogicalTasksForTopologyTask(this).get(component);
+      if(logicalInstallTaskId == null) {
+        LOG.info("Skipping physical install task registering, because component {} cannot be found", task.getRole());
+        continue;
+      }
+      //todo: for now only one physical task per component
+      long taskId = task.getTaskId();
+      hostRequest.registerPhysicalTaskId(logicalInstallTaskId, taskId);
+    }
+
+    LOG.info("HostRequest: Exiting INSTALL task for host: {}", hostRequest.getHostName());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java
new file mode 100644
index 0000000..0730fe8
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.topology.tasks;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostGroup;
+import org.apache.ambari.server.topology.HostRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+
+public class PersistHostResourcesTask extends TopologyHostTask  {
+
+  private final static Logger LOG = LoggerFactory.getLogger(PersistHostResourcesTask.class);
+
+  @AssistedInject
+  public PersistHostResourcesTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest) {
+    super(topology, hostRequest);
+  }
+
+  @Override
+  public Type getType() {
+    return Type.RESOURCE_CREATION;
+  }
+
+  @Override
+  public void runTask() {
+    LOG.info("HostRequest: Executing RESOURCE_CREATION task for host: {}", hostRequest.getHostName());
+    HostGroup group = hostRequest.getHostGroup();
+    Map<String, Collection<String>> serviceComponents = new HashMap<String, Collection<String>>();
+    for (String service : group.getServices()) {
+      serviceComponents.put(service, new HashSet<String>(group.getComponents(service)));
+    }
+    clusterTopology.getAmbariContext().createAmbariHostResources(hostRequest.getClusterId(),
+      hostRequest.getHostName(), serviceComponents);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java
new file mode 100644
index 0000000..029f2a4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.topology.tasks;
+
+import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+
+
+public class RegisterWithConfigGroupTask extends TopologyHostTask {
+
+  private final static Logger LOG = LoggerFactory.getLogger(RegisterWithConfigGroupTask.class);
+
+  @AssistedInject
+  public RegisterWithConfigGroupTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest) {
+    super(topology, hostRequest);
+  }
+
+  @Override
+  public Type getType() {
+    return Type.CONFIGURE;
+  }
+
+  @Override
+  public void runTask() {
+    LOG.info("HostRequest: Executing CONFIGURE task for host: {}", hostRequest.getHostName());
+    clusterTopology.getAmbariContext().registerHostWithConfigGroup(hostRequest.getHostName(), clusterTopology,
+      hostRequest.getHostgroupName());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java
new file mode 100644
index 0000000..054ed1e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.topology.tasks;
+
+import java.util.List;
+
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ShortTaskStatus;
+import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+
+public class StartHostTask extends TopologyHostTask {
+
+  private final static Logger LOG = LoggerFactory.getLogger(StartHostTask.class);
+
+  @AssistedInject
+  public StartHostTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest, @Assisted boolean skipFailure) {
+    super(topology, hostRequest);
+    this.skipFailure = skipFailure;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.START;
+  }
+
+  @Override
+  public void runTask() {
+    LOG.info("HostRequest: Executing START task for host: {}", hostRequest.getHostName());
+    RequestStatusResponse response = clusterTopology.startHost(hostRequest.getHostName(), skipFailure);
+    // map logical install tasks to physical install tasks
+    List<ShortTaskStatus> underlyingTasks = response.getTasks();
+    for (ShortTaskStatus task : underlyingTasks) {
+
+      String component = task.getRole();
+      Long logicalStartTaskId = hostRequest.getLogicalTasksForTopologyTask(this).get(component);
+      if(logicalStartTaskId == null) {
+        LOG.info("Skipping physical start task registering, because component {} cannot be found", task.getRole());
+        continue;
+      }
+      // for now just set on outer map
+      hostRequest.registerPhysicalTaskId(logicalStartTaskId, task.getTaskId());
+    }
+
+    LOG.info("HostRequest: Exiting START task for host: {}", hostRequest.getHostName());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java
new file mode 100644
index 0000000..82a2f6e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.topology.tasks;
+
+import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken;
+import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostRequest;
+import org.apache.ambari.server.topology.TopologyManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+public abstract class TopologyHostTask implements TopologyTask {
+
+  private static Logger LOG = LoggerFactory.getLogger(TopologyHostTask.class);
+
+  ClusterTopology clusterTopology;
+  HostRequest hostRequest;
+  boolean skipFailure;
+
+  public TopologyHostTask(ClusterTopology topology, HostRequest hostRequest) {
+    this.clusterTopology = topology;
+    this.hostRequest = hostRequest;
+  }
+
+  /**
+   * Run with an InternalAuthenticationToken as when running these tasks we might not have any active security context.
+   */
+  public void run() {
+    Authentication savedAuthContext = SecurityContextHolder.getContext().getAuthentication();
+    try {
+      InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken(TopologyManager.INTERNAL_AUTH_TOKEN);
+      authenticationToken.setAuthenticated(true);
+      SecurityContextHolder.getContext().setAuthentication(authenticationToken);
+      runTask();
+    } finally {
+      SecurityContextHolder.getContext().setAuthentication(savedAuthContext);
+    }
+  }
+
+  public abstract void runTask();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java
new file mode 100644
index 0000000..0753c3d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.tasks;
+
+/**
+ * Task which is executed by the TopologyManager.
+ */
+public interface TopologyTask extends Runnable {
+  /**
+   * Task type.
+   */
+  public enum Type {
+    RESOURCE_CREATION,
+    CONFIGURE,
+    INSTALL,
+    START
+  }
+
+  /**
+   * Get the task type.
+   *
+   * @return the type of task
+   */
+  public Type getType();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
index 8ae192b..3f62366 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
@@ -73,6 +73,7 @@ import org.apache.ambari.server.state.scheduler.RequestExecutionImpl;
 import org.apache.ambari.server.state.stack.OsFamily;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostImpl;
 import org.apache.ambari.server.topology.PersistedState;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.codehaus.jettison.json.JSONException;
@@ -346,6 +347,8 @@ public class AgentResourceTest extends RandomPortJerseyTest {
         RequestExecutionImpl.class).build(RequestExecutionFactory.class));
       install(new FactoryModuleBuilder().build(StageFactory.class));
       install(new FactoryModuleBuilder().build(ExecutionCommandWrapperFactory.class));
+      install(new FactoryModuleBuilder().build(ConfigureClusterTaskFactory.class));
+
 
       bind(HostRoleCommandFactory.class).to(HostRoleCommandFactoryImpl.class);
       bind(SecurityHelper.class).toInstance(SecurityHelperImpl.getInstance());

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
index 97d560f..c6cef26 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
@@ -78,7 +78,7 @@ import org.apache.ambari.server.topology.HostRequest;
 import org.apache.ambari.server.topology.LogicalRequest;
 import org.apache.ambari.server.topology.PersistedState;
 import org.apache.ambari.server.topology.TopologyRequest;
-import org.apache.ambari.server.topology.TopologyTask;
+import org.apache.ambari.server.topology.tasks.TopologyTask;
 import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
index 748b4e9..af3fc08 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
@@ -66,6 +66,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.easymock.Capture;
 import org.easymock.EasyMockRule;
 import org.easymock.EasyMockSupport;
@@ -154,6 +155,8 @@ public class ClusterDeployWithStartOnlyTest {
   private ComponentInfo serviceComponentInfo;
   @Mock(type = MockType.NICE)
   private ComponentInfo clientComponentInfo;
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTaskFactory configureClusterTaskFactory;
 
   @Mock(type = MockType.STRICT)
   private Future mockFuture;

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
index a1f3d25..09a6aa2 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
@@ -67,6 +67,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.easymock.Capture;
 import org.easymock.EasyMockRule;
 import org.easymock.EasyMockSupport;
@@ -144,6 +145,8 @@ public class ClusterInstallWithoutStartOnComponentLevelTest {
   private Cluster cluster;
   @Mock(type = MockType.NICE)
   private HostRoleCommand hostRoleCommand;
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTaskFactory configureClusterTaskFactory;
 
 
   @Mock(type = MockType.NICE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
index 33f318a..44cc9f7 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
@@ -67,6 +67,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.easymock.Capture;
 import org.easymock.EasyMockRule;
 import org.easymock.EasyMockSupport;
@@ -144,6 +145,8 @@ public class ClusterInstallWithoutStartTest {
   private Cluster cluster;
   @Mock(type = MockType.NICE)
   private HostRoleCommand hostRoleCommand;
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTaskFactory configureClusterTaskFactory;
 
 
   @Mock(type = MockType.NICE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
index aa7ba0e..e9198fb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
 
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
 import org.easymock.EasyMockRule;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -59,14 +60,12 @@ public class ConfigureClusterTaskTest {
   @Mock(type = MockType.STRICT)
   private ClusterTopology clusterTopology;
 
-  private TopologyManager.ConfigureClusterTask testSubject;
-
+  private ConfigureClusterTask testSubject;
 
   @Before
   public void before() {
     reset(clusterConfigurationRequest, clusterTopology);
-    testSubject = new TopologyManager.ConfigureClusterTask(clusterTopology, clusterConfigurationRequest);
-
+    testSubject = new ConfigureClusterTask(clusterTopology, clusterConfigurationRequest);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 4f087f0..469617c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.stack.NoSuchStackException;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRule;
@@ -151,6 +152,8 @@ public class TopologyManagerTest {
   private SettingDAO settingDAO;
   @Mock(type = MockType.NICE)
   private ClusterTopology clusterTopologyMock;
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTaskFactory configureClusterTaskFactory;
 
 
   @Mock(type = MockType.STRICT)

http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java
index 5c77831..cf7ff7f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java
@@ -80,6 +80,7 @@ import org.apache.ambari.server.state.host.HostFactory;
 import org.apache.ambari.server.state.stack.OsFamily;
 import org.apache.ambari.server.topology.PersistedState;
 import org.apache.ambari.server.topology.TopologyManager;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.easymock.EasyMockSupport;
@@ -130,6 +131,7 @@ public class StageUtilsTest extends EasyMockSupport {
 
         install(new FactoryModuleBuilder().build(ExecutionCommandWrapperFactory.class));
         install(new FactoryModuleBuilder().implement(Config.class, ConfigImpl.class).build(ConfigFactory.class));
+        install(new FactoryModuleBuilder().build(ConfigureClusterTaskFactory.class));
       }
     });
 


Mime
View raw message