Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 16277200BBF for ; Mon, 14 Nov 2016 19:27:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 14D8A160AF4; Mon, 14 Nov 2016 18:27:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B2E1C160B1E for ; Mon, 14 Nov 2016 19:27:17 +0100 (CET) Received: (qmail 25829 invoked by uid 500); 14 Nov 2016 18:27:15 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 25016 invoked by uid 99); 14 Nov 2016 18:27:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Nov 2016 18:27:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60C59E1171; Mon, 14 Nov 2016 18:27:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ncole@apache.org To: commits@ambari.apache.org Date: Mon, 14 Nov 2016 18:27:44 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] ambari git commit: AMBARI-18569. Execute topology tasks in parallel by hosts. (Attila Doroszlai via stoader) archived-at: Mon, 14 Nov 2016 18:27:21 -0000 AMBARI-18569. Execute topology tasks in parallel by hosts. (Attila Doroszlai via stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5dea886e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5dea886e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5dea886e Branch: refs/heads/branch-dev-patch-upgrade Commit: 5dea886efde58fb93033f705b5dd43b5ccd72ad6 Parents: 2ddd7a3 Author: Attila Doroszlai Authored: Fri Nov 11 12:22:04 2016 +0100 Committer: Toader, Sebastian Committed: Fri Nov 11 12:22:04 2016 +0100 ---------------------------------------------------------------------- .../server/configuration/Configuration.java | 37 ++++++++++++++- .../AmbariManagementControllerImpl.java | 20 ++++---- .../internal/ComponentResourceProvider.java | 4 +- .../internal/HostComponentResourceProvider.java | 2 +- .../internal/ServiceResourceProvider.java | 4 +- .../ambari/server/orm/entities/StackEntity.java | 10 +++- .../server/topology/HostOfferResponse.java | 46 ++++++++++++------ .../ambari/server/topology/HostRequest.java | 18 +++---- .../ambari/server/topology/LogicalRequest.java | 13 +++--- .../ambari/server/topology/TopologyManager.java | 49 ++++++++++++++------ .../server/configuration/ConfigurationTest.java | 6 +-- 11 files changed, 143 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 9d2243b..15f186b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -100,7 +100,6 @@ import com.google.gson.JsonPrimitive; import com.google.inject.Inject; import com.google.inject.Singleton; - /** * The {@link Configuration} class is used to read from the * {{ambari.properties}} file and manage/expose the configuration properties. @@ -2481,6 +2480,22 @@ public class Configuration { public static final ConfigurationProperty LOG4JMONITOR_DELAY = new ConfigurationProperty<>( "log4j.monitor.delay", TimeUnit.MINUTES.toMillis(5)); + /** + * Indicates whether parallel topology task creation is enabled for blueprint cluster provisioning. + * Defaults to false. + * @see #TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT + */ + @Markdown(description = "Indicates whether parallel topology task creation is enabled") + public static final ConfigurationProperty TOPOLOGY_TASK_PARALLEL_CREATION_ENABLED = new ConfigurationProperty<>("topology.task.creation.parallel", Boolean.FALSE); + + /** + * The number of threads to use for parallel topology task creation in blueprint cluster provisioning if enabled. + * Defaults to 10. + * @see #TOPOLOGY_TASK_PARALLEL_CREATION_ENABLED + */ + @Markdown(description = "The number of threads to use for parallel topology task creation if enabled") + public static final ConfigurationProperty TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT = new ConfigurationProperty<>("topology.task.creation.parallel.threads", 10); + private static final Logger LOG = LoggerFactory.getLogger( Configuration.class); @@ -5168,6 +5183,24 @@ public class Configuration { } /** + * @return the number of threads to use for parallel topology task creation if enabled + */ + public int getParallelTopologyTaskCreationThreadPoolSize() { + try { + return Integer.parseInt(getProperty(TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT)); + } catch (NumberFormatException e) { + return TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT.getDefaultValue(); + } + } + + /** + * @return true if parallel execution of task creation is enabled explicitly + */ + public boolean isParallelTopologyTaskCreationEnabled() { + return Boolean.parseBoolean(getProperty(TOPOLOGY_TASK_PARALLEL_CREATION_ENABLED)); + } + + /** * Generates a markdown table which includes: *
    *
  • Property key name
  • http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index 09e49ef..b04fdd7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -2164,16 +2164,16 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle commandParams.putAll(commandParamsInp); } - //Propogate HCFS service type info - Iterator it = cluster.getServices().values().iterator(); - while(it.hasNext()) { - ServiceInfo serviceInfoInstance = ambariMetaInfo.getService(stackId.getStackName(),stackId.getStackVersion(), it.next().getName()); - LOG.info("Iterating service type Instance in createHostAction:: " + serviceInfoInstance.getName()); - if(serviceInfoInstance.getServiceType() != null) { - LOG.info("Adding service type info in createHostAction:: " + serviceInfoInstance.getServiceType()); - commandParams.put("dfs_type",serviceInfoInstance.getServiceType()); - break; - } + // Propagate HCFS service type info + for (Service service : cluster.getServices().values()) { + ServiceInfo serviceInfoInstance = ambariMetaInfo.getService(stackId.getStackName(),stackId.getStackVersion(), service.getName()); + LOG.debug("Iterating service type Instance in createHostAction: {}", serviceInfoInstance.getName()); + String serviceType = serviceInfoInstance.getServiceType(); + if (serviceType != null) { + LOG.info("Adding service type info in createHostAction: {}", serviceType); + commandParams.put("dfs_type", serviceType); + break; + } } boolean isInstallCommand = roleCommand.equals(RoleCommand.INSTALL); http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java index 241a48f..453c688 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java @@ -266,7 +266,7 @@ public class ComponentResourceProvider extends AbstractControllerResourceProvide } // Create the components for the given requests. - public synchronized void createComponents(Set requests) + public void createComponents(Set requests) throws AmbariException, AuthorizationException { if (requests.isEmpty()) { @@ -473,7 +473,7 @@ public class ComponentResourceProvider extends AbstractControllerResourceProvide } // Update the components for the given requests. - protected synchronized RequestStatusResponse updateComponents(Set requests, + protected RequestStatusResponse updateComponents(Set requests, Map requestProperties, boolean runSmokeTest) throws AmbariException, AuthorizationException { http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java index 87eb266..c8ec08b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java @@ -474,7 +474,7 @@ public class HostComponentResourceProvider extends AbstractControllerResourcePro //todo: This was moved from AmbariManagementController and needs a lot of refactoring. //todo: Look into using the predicate instead of Set //todo: change to private access when all AMC tests have been moved. - protected synchronized RequestStageContainer updateHostComponents(RequestStageContainer stages, + protected RequestStageContainer updateHostComponents(RequestStageContainer stages, Set requests, Map requestProperties, boolean runSmokeTest) throws AmbariException, AuthorizationException { http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java index a08d153..0d5c174 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java @@ -343,7 +343,7 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider } // Create services from the given request. - public synchronized void createServices(Set requests) + public void createServices(Set requests) throws AmbariException, AuthorizationException { if (requests.isEmpty()) { @@ -461,7 +461,7 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider } // Update services based on the given requests. - protected synchronized RequestStageContainer updateServices(RequestStageContainer requestStages, Set requests, + protected RequestStageContainer updateServices(RequestStageContainer requestStages, Set requests, Map requestProperties, boolean runSmokeTest, boolean reconfigureClients, boolean startDependencies) throws AmbariException, AuthorizationException { http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java index c425969..2cfe07c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,6 +24,7 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; +import javax.persistence.QueryHint; import javax.persistence.Table; import javax.persistence.TableGenerator; import javax.persistence.UniqueConstraint; @@ -39,7 +40,12 @@ import javax.persistence.UniqueConstraint; @TableGenerator(name = "stack_id_generator", table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value", pkColumnValue = "stack_id_seq", initialValue = 0) @NamedQueries({ @NamedQuery(name = "StackEntity.findAll", query = "SELECT stack FROM StackEntity stack"), - @NamedQuery(name = "StackEntity.findByNameAndVersion", query = "SELECT stack FROM StackEntity stack WHERE stack.stackName = :stackName AND stack.stackVersion = :stackVersion") }) + @NamedQuery(name = "StackEntity.findByNameAndVersion", query = "SELECT stack FROM StackEntity stack WHERE stack.stackName = :stackName AND stack.stackVersion = :stackVersion", + hints = { + @QueryHint(name = "eclipselink.query-results-cache", value = "true"), + @QueryHint(name = "eclipselink.query-results-cache.size", value = "100") + }) +}) public class StackEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java index 2932581..495aea6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,30 +19,37 @@ package org.apache.ambari.server.topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; +import java.util.concurrent.Executor; /** * Response to a host offer. */ -public class HostOfferResponse { +final class HostOfferResponse { + public enum Answer {ACCEPTED, DECLINED_PREDICATE, DECLINED_DONE} + private static final Logger LOG = LoggerFactory.getLogger(HostOfferResponse.class); + static final HostOfferResponse DECLINED_DUE_TO_PREDICATE = new HostOfferResponse(Answer.DECLINED_PREDICATE); + static final HostOfferResponse DECLINED_DUE_TO_DONE = new HostOfferResponse(Answer.DECLINED_DONE); + private final Answer answer; private final String hostGroupName; private final long hostRequestId; private final List tasks; - public HostOfferResponse(Answer answer) { - if (answer == Answer.ACCEPTED) { - throw new IllegalArgumentException("For accepted response, hostgroup name and tasks must be set"); - } - this.answer = answer; - this.hostRequestId = -1; - this.hostGroupName = null; - this.tasks = null; + static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List tasks) { + return new HostOfferResponse(Answer.ACCEPTED, hostRequestId, hostGroupName, tasks); + } + + private HostOfferResponse(Answer answer) { + this(answer, -1, null, null); } - public HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List tasks) { + private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List tasks) { this.answer = answer; this.hostRequestId = hostRequestId; this.hostGroupName = hostGroupName; @@ -63,7 +70,20 @@ public class HostOfferResponse { return hostGroupName; } - public List getTasks() { - return tasks; + void executeTasks(Executor executor, final String hostName, final ClusterTopology topology, final AmbariContext ambariContext) { + if (answer != Answer.ACCEPTED) { + LOG.warn("Attempted to execute tasks for declined host offer", answer); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + for (TopologyTask task : tasks) { + LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType()); + task.init(topology, ambariContext); + task.run(); + } + } + }); + } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java index 6a65b48..a18999b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -134,15 +134,15 @@ public class HostRequest implements Comparable { //todo: synchronization public synchronized HostOfferResponse offer(HostImpl host) { if (!isOutstanding) { - return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE); + return HostOfferResponse.DECLINED_DUE_TO_DONE; } if (matchesHost(host)) { isOutstanding = false; hostname = host.getHostName(); setHostOnTasks(host); - return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, id, hostGroup.getName(), topologyTasks); + return HostOfferResponse.createAcceptedResponse(id, hostGroup.getName(), topologyTasks); } else { - return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE); + return HostOfferResponse.DECLINED_DUE_TO_PREDICATE; } } @@ -466,6 +466,7 @@ public class HostRequest implements Comparable { @Override public void run() { + LOG.info("HostRequest: Executing RESOURCE_CREATION task for host: {}", hostname); HostGroup group = topology.getBlueprint().getHostGroup(getHostgroupName()); Map> serviceComponents = new HashMap>(); for (String service : group.getServices()) { @@ -492,6 +493,7 @@ public class HostRequest implements Comparable { @Override public void run() { + LOG.info("HostRequest: Executing CONFIGURE task for host: {}", hostname); ambariContext.registerHostWithConfigGroup(getHostName(), clusterTopology, getHostgroupName()); } } @@ -517,7 +519,7 @@ public class HostRequest implements Comparable { @Override public void run() { - LOG.info("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname); + LOG.info("HostRequest: Executing INSTALL task for host: {}", hostname); boolean skipInstallTaskCreate = topology.getProvisionAction().equals(ProvisionAction.START_ONLY); RequestStatusResponse response = clusterTopology.installHost(hostname, skipInstallTaskCreate, skipFailure); // map logical install tasks to physical install tasks @@ -543,7 +545,7 @@ public class HostRequest implements Comparable { } } - LOG.info("HostRequest.InstallHostTask: Exiting INSTALL task for host: " + hostname); + LOG.info("HostRequest: Exiting INSTALL task for host: {}", hostname); } } @@ -568,7 +570,7 @@ public class HostRequest implements Comparable { @Override public void run() { - LOG.info("HostRequest.StartHostTask: Executing START task for host: " + hostname); + LOG.info("HostRequest: Executing START task for host: {}", hostname); RequestStatusResponse response = clusterTopology.startHost(hostname, skipFailure); // map logical install tasks to physical install tasks List underlyingTasks = response.getTasks(); @@ -591,7 +593,7 @@ public class HostRequest implements Comparable { } } - LOG.info("HostRequest.StartHostTask: Exiting START task for host: " + hostname); + LOG.info("HostRequest: Exiting START task for host: {}", hostname); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java index 3aaf589..0039e35 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -121,7 +121,7 @@ public class LogicalRequest extends Request { //todo: prioritization of master host requests Iterator hostRequestIterator = outstandingHostRequests.iterator(); while (hostRequestIterator.hasNext()) { - LOG.info("LogicalRequest.offer: attempting to match a request to a request for a reserved host to hostname = {}", host.getHostName()); + LOG.info("LogicalRequest.offer: attempting to match a request to a request for a non-reserved host to hostname = {}", host.getHostName()); HostOfferResponse response = hostRequestIterator.next().offer(host); switch (response.getAnswer()) { case ACCEPTED: @@ -132,23 +132,22 @@ public class LogicalRequest extends Request { //todo: should have been done on ACCEPT hostRequestIterator.remove(); LOG.info("LogicalRequest.offer: host request returned DECLINED_DONE for hostname = {}, host request has been removed from list", host.getHostName()); + break; case DECLINED_PREDICATE: LOG.info("LogicalRequest.offer: host request returned DECLINED_PREDICATE for hostname = {}", host.getHostName()); predicateRejected = true; + break; } } LOG.info("LogicalRequest.offer: outstandingHost request list size = " + outstandingHostRequests.size()); } - - - // if at least one outstanding host request rejected for predicate or we have an outstanding request // with a reserved host decline due to predicate, otherwise decline due to all hosts being resolved return predicateRejected || ! requestsWithReservedHosts.isEmpty() ? - new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE) : - new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE); + HostOfferResponse.DECLINED_DUE_TO_PREDICATE : + HostOfferResponse.DECLINED_DUE_TO_DONE; } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index bba0325..341633e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -38,6 +39,7 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor; +import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; import org.apache.ambari.server.controller.ShortTaskStatus; @@ -86,7 +88,9 @@ public class TopologyManager { private static final String CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = "cluster_configure_task_timeout"; private PersistedState persistedState; - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final Executor taskExecutor; // executes TopologyTasks + private final boolean parallelTaskCreationEnabled; private Collection hostsToIgnore = new HashSet(); private final List availableHosts = new LinkedList(); private final Map reservedHosts = new HashMap(); @@ -127,8 +131,18 @@ public class TopologyManager { */ private Map clusterProvisionWithBlueprintCreationFinished = new HashMap<>(); - public TopologyManager(){ + public TopologyManager() { + parallelTaskCreationEnabled = false; + taskExecutor = executor; + } + @Inject + public TopologyManager(Configuration configuration) { + int threadPoolSize = configuration.getParallelTopologyTaskCreationThreadPoolSize(); + parallelTaskCreationEnabled = configuration.isParallelTopologyTaskCreationEnabled() && threadPoolSize > 1; + taskExecutor = parallelTaskCreationEnabled + ? Executors.newFixedThreadPool(threadPoolSize) + : executor; } @Inject @@ -691,7 +705,7 @@ public class TopologyManager { return logicalRequest; } - private void processAcceptedHostOffer(ClusterTopology topology, final HostOfferResponse response, final HostImpl host) { + private void processAcceptedHostOffer(final ClusterTopology topology, final HostOfferResponse response, final HostImpl host) { final String hostName = host.getHostName(); try { topology.addHostToTopology(response.getHostGroupName(), hostName); @@ -722,19 +736,24 @@ public class TopologyManager { throw new RuntimeException(e); } - - LOG.info("TopologyManager.processAcceptedHostOffer: about to execute tasks for host = {}", - hostName); - - for (TopologyTask task : response.getTasks()) { - LOG.info("Processing accepted host offer for {} which responded {} and task {}", - hostName, response.getAnswer(), task.getType()); - - task.init(topology, ambariContext); - executor.execute(task); + LOG.info("TopologyManager.processAcceptedHostOffer: queue tasks for host = {} which responded {}", hostName, response.getAnswer()); + if (parallelTaskCreationEnabled) { + executor.execute(new Runnable() { // do not start until cluster config done + @Override + public void run() { + queueHostTasks(topology, response, hostName); + } + }); + } else { + queueHostTasks(topology, response, hostName); } } + private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) { + LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName); + response.executeTasks(taskExecutor, hostName, topology, ambariContext); + } + private void updateHostWithRackInfo(ClusterTopology topology, HostOfferResponse response, HostImpl host) { // the rack info from the cluster creation template String rackInfoFromTemplate = topology.getHostGroupInfo().get(response.getHostGroupName()).getHostRackInfo().get @@ -878,7 +897,7 @@ public class TopologyManager { } ConfigureClusterTask configureClusterTask = new ConfigureClusterTask(topology, configurationRequest); - AsyncCallableService asyncCallableService = new AsyncCallableService(configureClusterTask, timeout, delay, + AsyncCallableService asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay, Executors.newScheduledThreadPool(1)); executor.submit(asyncCallableService); http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java index f90cf76..b0bcc58 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java @@ -1039,8 +1039,6 @@ public class ConfigurationTest { /** * Tests that every {@link ConfigurationProperty} field in * {@link Configuration} has a property {@link Markdown} annotation. - * - * @throws Exception */ @Test public void testAllPropertiesHaveMarkdownDescriptions() throws Exception { @@ -1052,9 +1050,9 @@ public class ConfigurationTest { ConfigurationProperty configurationProperty = (ConfigurationProperty) field.get(null); Markdown markdown = field.getAnnotation(Markdown.class); - if( null == markdown ){ + if (null == markdown) { ConfigurationMarkdown configMarkdown = field.getAnnotation(ConfigurationMarkdown.class); - markdown = configMarkdown.markdown(); + markdown = configMarkdown != null ? configMarkdown.markdown() : null; } Assert.assertNotNull("The configuration property " + configurationProperty.getKey()