Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 23724200D26 for ; Thu, 5 Oct 2017 12:46:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 21EE31609E1; Thu, 5 Oct 2017 10:46:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C20FB160BDB for ; Thu, 5 Oct 2017 12:46:39 +0200 (CEST) Received: (qmail 9336 invoked by uid 500); 5 Oct 2017 10:46:38 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 9311 invoked by uid 99); 5 Oct 2017 10:46:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Oct 2017 10:46:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A29FDF9FE; Thu, 5 Oct 2017 10:46:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpapirkovskyy@apache.org To: commits@ambari.apache.org Date: Thu, 05 Oct 2017 10:46:38 -0000 Message-Id: <6de29033371946cf81ec3f859b1688f1@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] ambari git commit: AMBARI-22063. Poor performance of STOMP subscriptions cache and registration handling. (mpapirkovskyy) archived-at: Thu, 05 Oct 2017 10:46:42 -0000 http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java index 81fb300..3bf9e79 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java @@ -17,6 +17,16 @@ */ package org.apache.ambari.server.agent.stomp; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import javax.ws.rs.WebApplicationException; import org.apache.ambari.server.AmbariException; @@ -27,78 +37,138 @@ import org.apache.ambari.server.agent.HeartBeatResponse; import org.apache.ambari.server.agent.Register; import org.apache.ambari.server.agent.RegistrationResponse; import org.apache.ambari.server.agent.RegistrationStatus; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.configuration.spring.GuiceBeansConfig; import org.apache.ambari.server.state.cluster.ClustersImpl; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.simp.annotation.SendToUser; -import org.springframework.messaging.simp.annotation.SubscribeMapping; import org.springframework.stereotype.Controller; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Injector; +import com.google.inject.persist.UnitOfWork; @Controller @SendToUser("/") @MessageMapping("/") +@Import(GuiceBeansConfig.class) public class HeartbeatController { - private static Log LOG = LogFactory.getLog(HeartbeatController.class); + private static Logger LOG = LoggerFactory.getLogger(HeartbeatController.class); private final HeartBeatHandler hh; private final ClustersImpl clusters; private final AgentSessionManager agentSessionManager; + private final LinkedBlockingQueue queue; + private final ThreadFactory threadFactoryExecutor = new ThreadFactoryBuilder().setNameFormat("agent-register-processor-%d").build(); + private final ThreadFactory threadFactoryTimeout = new ThreadFactoryBuilder().setNameFormat("agent-register-timeout-%d").build(); + private final ExecutorService executor; + private final ScheduledExecutorService scheduledExecutorService; + private final UnitOfWork unitOfWork; + + @Autowired + private AgentsRegistrationQueue agentsRegistrationQueue; public HeartbeatController(Injector injector) { hh = injector.getInstance(HeartBeatHandler.class); clusters = injector.getInstance(ClustersImpl.class); + unitOfWork = injector.getInstance(UnitOfWork.class); agentSessionManager = injector.getInstance(AgentSessionManager.class); + + Configuration configuration = injector.getInstance(Configuration.class); + queue = new LinkedBlockingQueue(configuration.getAgentsRegistrationQueueSize()); + executor = new ThreadPoolExecutor(configuration.getRegistrationThreadPoolSize(), + configuration.getRegistrationThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, queue, threadFactoryExecutor); + scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactoryTimeout); } - @SubscribeMapping("/register") - public RegistrationResponse register(@Header String simpSessionId, Register message) + @MessageMapping("/register") + public CompletableFuture register(@Header String simpSessionId, Register message) throws WebApplicationException, InvalidStateTransitionException, AmbariException { + CompletableFuture completableFuture = new CompletableFuture<>(); - /* Call into the heartbeat handler */ + Future future = executor.submit(() -> { + try { + unitOfWork.begin(); + RegistrationResponse response = null; + try { + /* Call into the heartbeat handler */ + response = hh.handleRegistration(message); + agentSessionManager.register(simpSessionId, + clusters.getHost(message.getHostname())); + LOG.debug("Sending registration response " + response); + } catch (Exception ex) { + LOG.info(ex.getMessage(), ex); + response = new RegistrationResponse(); + response.setResponseId(-1); + response.setResponseStatus(RegistrationStatus.FAILED); + response.setExitstatus(1); + response.setLog(ex.getMessage()); + completableFuture.complete(response); + return response; + } + completableFuture.complete(response); + return response; + } finally { + unitOfWork.end(); + } + }); - RegistrationResponse response = null; - try { - response = hh.handleRegistration(message); - LOG.debug("Sending registration response " + response); - } catch (AmbariException ex) { - response = new RegistrationResponse(); - response.setResponseId(-1); - response.setResponseStatus(RegistrationStatus.FAILED); - response.setExitstatus(1); - response.setLog(ex.getMessage()); - return response; - } - agentSessionManager.register(simpSessionId, - clusters.getHost(message.getHostname())); - return response; + scheduledExecutorService.schedule(new RegistrationTimeoutTask(future, completableFuture), 8, TimeUnit.SECONDS); + return completableFuture; } - @SubscribeMapping("/heartbeat") + @MessageMapping("/heartbeat") public HeartBeatResponse heartbeat(@Header String simpSessionId, HeartBeat message) { - if (LOG.isDebugEnabled()) { - LOG.debug("Received Heartbeat message " + message); - } - HeartBeatResponse heartBeatResponse; try { - if (!agentSessionManager.isRegistered(simpSessionId)) { - //Server restarted, or unknown host. - LOG.error(String.format("Host with [%s] sessionId not registered", simpSessionId)); - return hh.createRegisterCommand(); - } - message.setHostname(agentSessionManager.getHost(simpSessionId).getHostName()); - heartBeatResponse = hh.handleHeartBeat(message); + unitOfWork.begin(); if (LOG.isDebugEnabled()) { - LOG.debug("Sending heartbeat response with response id " + heartBeatResponse.getResponseId()); - LOG.debug("Response details " + heartBeatResponse); + LOG.debug("Received Heartbeat message " + message); + } + HeartBeatResponse heartBeatResponse; + try { + if (!agentSessionManager.isRegistered(simpSessionId)) { + //Server restarted, or unknown host. + LOG.error(String.format("Host with [%s] sessionId not registered", simpSessionId)); + return hh.createRegisterCommand(); + } + message.setHostname(agentSessionManager.getHost(simpSessionId).getHostName()); + heartBeatResponse = hh.handleHeartBeat(message); + agentsRegistrationQueue.complete(simpSessionId); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending heartbeat response with response id " + heartBeatResponse.getResponseId()); + LOG.debug("Response details " + heartBeatResponse); + } + } catch (Exception e) { + LOG.warn("Error in HeartBeat", e); + throw new WebApplicationException(500); + } + return heartBeatResponse; + } finally { + unitOfWork.end(); + } + } + + private class RegistrationTimeoutTask implements Runnable { + private Future task; + private CompletableFuture completableFuture; + + public RegistrationTimeoutTask(Future task, CompletableFuture completableFuture) { + + this.task = task; + this.completableFuture = completableFuture; + } + + @Override + public void run() { + boolean cancelled = task.cancel(false); + if (cancelled) { + completableFuture.cancel(false); } - } catch (Exception e) { - LOG.warn("Error in HeartBeat", e); - throw new WebApplicationException(500); } - return heartBeatResponse; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java index 4e6d37c..7e02590 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java @@ -44,14 +44,14 @@ public class HostLevelParamsHolder extends AgentHostDataHolder hostLevelParamsClusters = new TreeMap<>(); - for (Cluster cl : clusters.getClustersForHost(hostName)) { - Host host = clusters.getHost(hostName); + Host host = clusters.getHostById(hostId); + for (Cluster cl : clusters.getClustersForHost(host.getHostName())) { //TODO fix repo info host param HostLevelParamsCluster hostLevelParamsCluster = new HostLevelParamsCluster( null,//ambariMetaInfo.getRepoInfo(cl, host), - recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(), hostName)); + recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(), host.getHostName())); hostLevelParamsClusters.put(Long.toString(cl.getClusterId()), hostLevelParamsCluster); @@ -62,7 +62,7 @@ public class HostLevelParamsHolder extends AgentHostDataHolder - * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java index 6e01cac..22a28f0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java @@ -18,33 +18,33 @@ package org.apache.ambari.server.agent.stomp.dto; -import java.util.Map; +import java.util.SortedMap; import com.fasterxml.jackson.annotation.JsonInclude; @JsonInclude(JsonInclude.Include.NON_EMPTY) public class ClusterConfigs { - private Map> configurations; - private Map>> configurationAttributes; + private SortedMap> configurations; + private SortedMap>> configurationAttributes; - public ClusterConfigs(Map> configurations, Map>> configurationAttributes) { + public ClusterConfigs(SortedMap> configurations, SortedMap>> configurationAttributes) { this.configurations = configurations; this.configurationAttributes = configurationAttributes; } - public Map> getConfigurations() { + public SortedMap> getConfigurations() { return configurations; } - public void setConfigurations(Map> configurations) { + public void setConfigurations(SortedMap> configurations) { this.configurations = configurations; } - public Map>> getConfigurationAttributes() { + public SortedMap>> getConfigurationAttributes() { return configurationAttributes; } - public void setConfigurationAttributes(Map>> configurationAttributes) { + public void setConfigurationAttributes(SortedMap>> configurationAttributes) { this.configurationAttributes = configurationAttributes; } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java index 8a62d6b..19f9597 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java index 8210779..5620209 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java index a84b509..9b4bc02 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java index c350b5f6..c17ec7f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java index 1060232..d96180d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java index 8d2b627..9cdf944 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 0e519fd..2fd22d5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -2114,6 +2114,49 @@ public class Configuration { "messaging.threadpool.size", 1); /** + * The thread pool size for agents registration. + */ + @Markdown(description = "Thread pool size for agents registration") + public static final ConfigurationProperty REGISTRATION_THREAD_POOL_SIZE = new ConfigurationProperty<>( + "registration.threadpool.size", 10); + + /** + * Maximal cache size for spring subscription registry. + */ + @Markdown(description = "Maximal cache size for spring subscription registry.") + public static final ConfigurationProperty SUBSCRIPTION_REGISTRY_CACHE_MAX_SIZE = new ConfigurationProperty<>( + "subscription.registry.cache.size", 1500); + + /** + * Queue size for agents in registration. + */ + @Markdown(description = "Queue size for agents in registration.") + public static final ConfigurationProperty AGENTS_REGISTRATION_QUEUE_SIZE = new ConfigurationProperty<>( + "agents.registration.queue.size", 200); + + + /** + * Period in seconds with agents reports will be processed. + */ + @Markdown(description = "Period in seconds with agents reports will be processed.") + public static final ConfigurationProperty AGENTS_REPORT_PROCESSING_PERIOD = new ConfigurationProperty<>( + "agents.reports.processing.period", 1); + + /** + * Timeout in seconds before start processing of agents' reports. + */ + @Markdown(description = "Timeout in seconds before start processing of agents' reports.") + public static final ConfigurationProperty AGENTS_REPORT_PROCESSING_START_TIMEOUT = new ConfigurationProperty<>( + "agents.reports.processing.start.timeout", 5); + + /** + * Thread pool size for agents reports processing. + */ + @Markdown(description = "Thread pool size for agents reports processing.") + public static final ConfigurationProperty AGENTS_REPORT_THREAD_POOL_SIZE = new ConfigurationProperty<>( + "agents.reports.thread.pool.size", 10); + + /** * The maximum number of threads used to extract Ambari Views when Ambari * Server is starting up. */ @@ -4791,6 +4834,49 @@ public class Configuration { } /** + * @return max thread pool size for agents registration, default 10 + */ + public int getRegistrationThreadPoolSize() { + return Integer.parseInt(getProperty(REGISTRATION_THREAD_POOL_SIZE)); + } + + /** + * @return max cache size for spring subscription registry. + */ + public int getSubscriptionRegistryCacheSize() { + return Integer.parseInt(getProperty(SUBSCRIPTION_REGISTRY_CACHE_MAX_SIZE)); + } + + /** + * @return queue size for agents in registration. + */ + public int getAgentsRegistrationQueueSize() { + return Integer.parseInt(getProperty(AGENTS_REGISTRATION_QUEUE_SIZE)); + } + + + /** + * @return period in seconds with agents reports will be processed. + */ + public int getAgentsReportProcessingPeriod() { + return Integer.parseInt(getProperty(AGENTS_REPORT_PROCESSING_PERIOD)); + } + + /** + * @return timeout in seconds before start processing of agents' reports. + */ + public int getAgentsReportProcessingStartTimeout() { + return Integer.parseInt(getProperty(AGENTS_REPORT_PROCESSING_START_TIMEOUT)); + } + + /** + * @return thread pool size for agents reports processing. + */ + public int getAgentsReportThreadPoolSize() { + return Integer.parseInt(getProperty(AGENTS_REPORT_THREAD_POOL_SIZE)); + } + + /** * @return max thread pool size for agents, default 25 */ public int getAgentThreadPoolSize() { http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java new file mode 100644 index 0000000..3f37353 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.configuration.spring; + +import org.apache.ambari.server.agent.stomp.AgentsRegistrationQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.ChannelInterceptorAdapter; +import org.springframework.messaging.support.MessageBuilder; + +public class AgentRegisteringQueueChecker extends ChannelInterceptorAdapter { + private static final Logger LOG = LoggerFactory.getLogger(AgentsRegistrationQueue.class); + + @Autowired + private AgentsRegistrationQueue agentsRegistrationQueue; + + @Override + public Message preSend(Message message, MessageChannel channel) { + StompHeaderAccessor headerAccessor= StompHeaderAccessor.wrap(message); + String sessionId = headerAccessor.getHeader("simpSessionId").toString(); + if (SimpMessageType.CONNECT_ACK.equals(headerAccessor.getMessageType()) + && !agentsRegistrationQueue.offer(sessionId)) { + StompHeaderAccessor headerAccessorError = StompHeaderAccessor.create(StompCommand.ERROR); + headerAccessorError.setHeader("simpSessionId", sessionId); + headerAccessorError.setHeader("simpConnectMessage", headerAccessor.getHeader("simpConnectMessage").toString()); + headerAccessorError.setMessage("Connection not allowed"); + + return MessageBuilder.createMessage(new byte[0], headerAccessorError.getMessageHeaders()); + } else if (SimpMessageType.DISCONNECT_ACK.equals(headerAccessor.getMessageType())) { + agentsRegistrationQueue.complete(sessionId); + } + return message; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java index 387bdda..516c405 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.configuration.spring; import org.apache.ambari.server.agent.stomp.HeartbeatController; import org.apache.ambari.server.api.stomp.TestController; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -32,10 +33,13 @@ import com.google.inject.Injector; @Configuration @EnableWebSocketMessageBroker @ComponentScan(basePackageClasses = {TestController.class, HeartbeatController.class}) -@Import(RootStompConfig.class) +@Import({RootStompConfig.class,GuiceBeansConfig.class}) public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer { private org.apache.ambari.server.configuration.Configuration configuration; + @Autowired + private AgentRegisteringQueueChecker agentRegisteringQueueChecker; + public AgentStompConfig(Injector injector) { configuration = injector.getInstance(org.apache.ambari.server.configuration.Configuration.class); } @@ -55,5 +59,6 @@ public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(configuration.getSpringMessagingThreadPoolSize()); + registration.setInterceptors(agentRegisteringQueueChecker); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java index 50c3aba..baa9d6e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.configuration.spring; +import org.apache.ambari.server.agent.stomp.AgentsRegistrationQueue; import org.apache.ambari.server.audit.AuditLogger; import org.apache.ambari.server.security.authorization.AmbariLdapAuthenticationProvider; import org.apache.ambari.server.security.authorization.AmbariLocalUserProvider; @@ -89,4 +90,15 @@ public class GuiceBeansConfig { return injector.getInstance(AmbariPamAuthenticationProvider.class); } + + @Bean + public AgentRegisteringQueueChecker agentRegisteringQueueChecker() { + return new AgentRegisteringQueueChecker(); + } + + @Bean + public AgentsRegistrationQueue agentsRegistrationQueue() { + return new AgentsRegistrationQueue(injector); + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java index 667022e..fda0607 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java @@ -22,6 +22,7 @@ import java.util.List; import javax.servlet.ServletContext; +import org.apache.ambari.server.agent.stomp.AmbariSubscriptionRegistry; import org.apache.ambari.server.api.AmbariSendToMethodReturnValueHandler; import org.apache.ambari.server.events.listeners.requests.StateUpdateListener; import org.eclipse.jetty.websocket.server.WebSocketServerFactory; @@ -30,14 +31,14 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessagingException; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.annotation.SendToUser; import org.springframework.messaging.simp.annotation.support.SendToMethodReturnValueHandler; import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler; +import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; +import org.springframework.messaging.support.ErrorMessage; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; @@ -52,8 +53,11 @@ public class RootStompConfig { private final ServletContext servletContext; - public RootStompConfig(ServletContext servletContext) { + private final org.apache.ambari.server.configuration.Configuration configuration; + + public RootStompConfig(ServletContext servletContext, Injector injector) { this.servletContext = servletContext; + configuration = injector.getInstance(org.apache.ambari.server.configuration.Configuration.class); } @Bean @@ -69,6 +73,13 @@ public class RootStompConfig { } @Autowired + public void configureRegistryCacheSize(SimpleBrokerMessageHandler simpleBrokerMessageHandler) throws NoSuchFieldException, IllegalAccessException { + AmbariSubscriptionRegistry defaultSubscriptionRegistry = + new AmbariSubscriptionRegistry(configuration.getSubscriptionRegistryCacheSize()); + simpleBrokerMessageHandler.setSubscriptionRegistry(defaultSubscriptionRegistry); + } + + @Autowired public void configureGlobal(SimpAnnotationMethodMessageHandler messageHandler) { List handlers = new ArrayList<>(messageHandler.getReturnValueHandlers()); List changedHandlers = new ArrayList<>(); @@ -94,26 +105,14 @@ public class RootStompConfig { public static class ExceptionHandlingAdvice{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionHandlingAdvice.class); - - @MessageExceptionHandler(MessagingException.class) + @MessageExceptionHandler(Exception.class) @SendToUser("/") - public ErrorMessage handle(MessagingException e) { - - LOG.error("Exception caught while processing message", e); + public ErrorMessage handle(Exception e) { + //LOG.error("Exception caught while processing message: " + e.getMessage(), e); return new ErrorMessage(e); } - static class ErrorMessage { - Message failedMessage; - String exception; - - ErrorMessage(MessagingException e) { - this.failedMessage = e.getFailedMessage(); - this.exception = e.getLocalizedMessage(); - } - } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java index f0f13e1..4b37d00 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java @@ -174,6 +174,9 @@ public interface AmbariManagementController { Set getHostComponents( Set requests) throws AmbariException; + Set getHostComponents( + Set requests, boolean statusOnly) throws AmbariException; + /** * Gets the configurations identified by the given request objects. * http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index 8769da9..817f340 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -137,6 +137,7 @@ import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ExtensionDAO; import org.apache.ambari.server.orm.dao.ExtensionLinkDAO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; +import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO; import org.apache.ambari.server.orm.dao.SettingDAO; import org.apache.ambari.server.orm.dao.StackDAO; import org.apache.ambari.server.orm.dao.WidgetDAO; @@ -147,6 +148,7 @@ import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.orm.entities.OperatingSystemEntity; import org.apache.ambari.server.orm.entities.RepositoryEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; +import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.SettingEntity; import org.apache.ambari.server.orm.entities.WidgetEntity; import org.apache.ambari.server.orm.entities.WidgetLayoutEntity; @@ -360,6 +362,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle @Inject private Provider m_agentConfigsHolder; + @Inject + private ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO; + /** * The KerberosHelper to help setup for enabling for disabling Kerberos */ @@ -694,16 +699,35 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle void persistServiceComponentHosts(Set requests) throws AmbariException { Multimap schMap = ArrayListMultimap.create(); + Map>> serviceComponentNames = new HashMap<>(); + Map>> serviceComponentDesiredStateEntities = new HashMap<>(); for (ServiceComponentHostRequest request : requests) { Cluster cluster = clusters.getCluster(request.getClusterName()); Service s = cluster.getService(request.getServiceName()); ServiceComponent sc = s.getServiceComponent( request.getComponentName()); + serviceComponentNames.computeIfAbsent(sc.getClusterId(), c -> new HashMap<>()) + .computeIfAbsent(sc.getServiceName(), h ->new ArrayList<>()).add(sc.getName()); + } - ServiceComponentHost sch = - serviceComponentHostFactory.createNew(sc, request.getHostname()); + List entities = serviceComponentDesiredStateDAO.findByNames(serviceComponentNames); + + for (ServiceComponentDesiredStateEntity stateEntity : entities) { + serviceComponentDesiredStateEntities.computeIfAbsent(stateEntity.getClusterId(), c -> new HashMap<>()) + .computeIfAbsent(stateEntity.getServiceName(), h ->new HashMap<>()) + .putIfAbsent(stateEntity.getComponentName(), stateEntity); + } + + for (ServiceComponentHostRequest request : requests) { + Cluster cluster = clusters.getCluster(request.getClusterName()); + Service s = cluster.getService(request.getServiceName()); + ServiceComponent sc = s.getServiceComponent( + request.getComponentName()); + ServiceComponentHost sch = + serviceComponentHostFactory.createNew(sc, request.getHostname(), + serviceComponentDesiredStateEntities.get(cluster.getClusterId()).get(s.getName()).get(sc.getName())); if (request.getDesiredState() != null && !request.getDesiredState().isEmpty()) { State state = State.valueOf(request.getDesiredState()); @@ -1205,6 +1229,11 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle private Set getHostComponents( ServiceComponentHostRequest request) throws AmbariException { + return getHostComponents(request, false); + } + + private Set getHostComponents( + ServiceComponentHostRequest request, boolean statusOnly) throws AmbariException { LOG.debug("Processing request {}", request); if (request.getClusterName() == null @@ -1348,7 +1377,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle } } - ServiceComponentHostResponse r = sch.convertToResponse(desiredConfigs); + ServiceComponentHostResponse r = statusOnly ? sch.convertToResponseStatusOnly(desiredConfigs, + filterBasedConfigStaleness) + : sch.convertToResponse(desiredConfigs); if (null == r || (filterBasedConfigStaleness && r.isStaleConfig() != staleConfig)) { continue; } @@ -1404,7 +1435,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle } } - ServiceComponentHostResponse r = sch.convertToResponse(desiredConfigs); + ServiceComponentHostResponse r = statusOnly ? sch.convertToResponseStatusOnly(desiredConfigs, + filterBasedConfigStaleness) + : sch.convertToResponse(desiredConfigs); if (null == r || (filterBasedConfigStaleness && r.isStaleConfig() != staleConfig)) { continue; } @@ -3859,12 +3892,18 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle @Override public Set getHostComponents( Set requests) throws AmbariException { + return getHostComponents(requests, false); + } + + @Override + public Set getHostComponents( + Set requests, boolean statusOnly) throws AmbariException { LOG.debug("Processing requests: {}", requests); Set response = new HashSet<>(); for (ServiceComponentHostRequest request : requests) { try { - response.addAll(getHostComponents(request)); + response.addAll(getHostComponents(request, statusOnly)); } catch (ServiceComponentHostNotFoundException | ServiceComponentNotFoundException | ServiceNotFoundException e) { if (requests.size() == 1) { // only throw exception if 1 request. http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java index 4143beb..c749846 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java @@ -134,6 +134,7 @@ import org.eclipse.jetty.server.SessionIdManager; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.server.session.DefaultSessionIdManager; import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.servlet.DefaultServlet; @@ -455,7 +456,14 @@ public class AmbariServer { enableLog4jMonitor(configsMap); - handlerList.addHandler(root); + GzipHandler gzipHandler = new GzipHandler(); + gzipHandler.setHandler(root); + + //TODO minimal set, perhaps is needed to add some other mime types + gzipHandler.setIncludedMimeTypes("text/html", "text/plain", "text/xml", "text/css", "application/javascript", + "application/x-javascript", "application/xml", "application/x-www-form-urlencoded", "application/json"); + handlerList.addHandler(gzipHandler); + server.setHandler(handlerList); ServletHolder agent = new ServletHolder(ServletContainer.class); @@ -512,7 +520,8 @@ public class AmbariServer { LOG.info("********* Initializing Clusters **********"); Clusters clusters = injector.getInstance(Clusters.class); StringBuilder clusterDump = new StringBuilder(); - clusters.debugDump(clusterDump); + //TODO temporally commented because takes a lot of time on 5k cluster + //clusters.debugDump(clusterDump); LOG.info("********* Current Clusters State *********"); LOG.info(clusterDump.toString()); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java index c36e107..1fd6697 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java @@ -541,7 +541,7 @@ public class HostResourceProvider extends AbstractControllerResourceProvider { recoveryConfigHelper.getRecoveryConfig(clusters.getCluster(hostRequest.getClusterName()).getClusterName(), addedHost.getHostName()) )); - hostLevelParamsUpdateEvent.setHostName(addedHost.getHostName()); + hostLevelParamsUpdateEvent.setHostId(addedHost.getHostId()); hostLevelParamsUpdateEvents.add(hostLevelParamsUpdateEvent); } } @@ -550,12 +550,12 @@ public class HostResourceProvider extends AbstractControllerResourceProvider { // TODO add rack change to topology update updateHostRackInfoIfChanged(clusters, hostRequests); - TopologyUpdateEvent topologyUpdateEvent = - new TopologyUpdateEvent(addedTopologies, TopologyUpdateEvent.EventType.UPDATE); - topologyHolder.updateData(topologyUpdateEvent); for (HostLevelParamsUpdateEvent hostLevelParamsUpdateEvent : hostLevelParamsUpdateEvents) { hostLevelParamsHolder.updateData(hostLevelParamsUpdateEvent); } + TopologyUpdateEvent topologyUpdateEvent = + new TopologyUpdateEvent(addedTopologies, TopologyUpdateEvent.EventType.UPDATE); + topologyHolder.updateData(topologyUpdateEvent); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java index 5e02a64..3bad54c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java @@ -92,7 +92,7 @@ public class DefaultServiceCalculatedState implements ServiceCalculatedState { serviceName, null, null, null); Set hostComponentResponses = - managementControllerProvider.get().getHostComponents(Collections.singleton(request)); + managementControllerProvider.get().getHostComponents(Collections.singleton(request), true); State masterState = null; State clientState = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java index ca4fe6e..6b22d3b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java @@ -50,7 +50,7 @@ public final class FlumeServiceCalculatedState extends DefaultServiceCalculatedS serviceName, null, null, null); Set hostComponentResponses = - managementControllerProvider.get().getHostComponents(Collections.singleton(request)); + managementControllerProvider.get().getHostComponents(Collections.singleton(request), true); State state = State.UNKNOWN; for (ServiceComponentHostResponse schr : hostComponentResponses) { http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java index eac0dce..84b805a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java @@ -53,7 +53,7 @@ public final class HBaseServiceCalculatedState extends DefaultServiceCalculatedS serviceName, null, null, null); Set hostComponentResponses = - managementControllerProvider.get().getHostComponents(Collections.singleton(request)); + managementControllerProvider.get().getHostComponents(Collections.singleton(request), true); int hBaseMasterActiveCount = 0; State nonStartedState = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java index 7bbad2a..0db8d8a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java @@ -53,7 +53,7 @@ public final class HDFSServiceCalculatedState extends DefaultServiceCalculatedSt serviceName, null, null, null); Set hostComponentResponses = - managementControllerProvider.get().getHostComponents(Collections.singleton(request)); + managementControllerProvider.get().getHostComponents(Collections.singleton(request), true); int nameNodeCount = 0; int nameNodeActiveCount = 0; http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java index 77e44a5..756d19e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java @@ -54,7 +54,7 @@ public final class HiveServiceCalculatedState extends DefaultServiceCalculatedSt serviceName, null, null, null); Set hostComponentResponses = - managementControllerProvider.get().getHostComponents(Collections.singleton(request)); + managementControllerProvider.get().getHostComponents(Collections.singleton(request), true); int activeHiveMetastoreComponentCount = 0; State nonStartedState = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java index 1803f70..d43375c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java @@ -54,7 +54,7 @@ public final class OozieServiceCalculatedState extends DefaultServiceCalculatedS serviceName, null, null, null); Set hostComponentResponses = - managementControllerProvider.get().getHostComponents(Collections.singleton(request)); + managementControllerProvider.get().getHostComponents(Collections.singleton(request), true); int oozieServerActiveCount = 0; State nonStartedState = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java index 2f1619f..36c2421 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java @@ -54,7 +54,7 @@ public final class YARNServiceCalculatedState extends DefaultServiceCalculatedSt serviceName, null, null, null); Set hostComponentResponses = - managementControllerProvider.get().getHostComponents(Collections.singleton(request)); + managementControllerProvider.get().getHostComponents(Collections.singleton(request), true); int resourceManagerActiveCount = 0; boolean isAppTimeLineServerActive = false; http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java index f0f17e1..89fc8bc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java @@ -42,7 +42,7 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha /** * Host identifier. */ - private String hostName; + private Long hostId; /** * Configs grouped by cluster id as keys. @@ -64,17 +64,17 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha this.hash = hash; } - public static AgentConfigsUpdateEvent emptyUpdate() { - return new AgentConfigsUpdateEvent(null); + public void setHostId(Long hostId) { + this.hostId = hostId; } - public void setHostName(String hostName) { - this.hostName = hostName; + @Override + public Long getHostId() { + return hostId; } - @Override - public String getHostName() { - return hostName; + public static AgentConfigsUpdateEvent emptyUpdate() { + return new AgentConfigsUpdateEvent(null); } @Override @@ -84,12 +84,12 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha AgentConfigsUpdateEvent that = (AgentConfigsUpdateEvent) o; - return Objects.equals(hostName, that.hostName) && + return Objects.equals(hostId, that.hostId) && Objects.equals(clustersConfigs, that.clustersConfigs); } @Override public int hashCode() { - return Objects.hash(hostName, clustersConfigs); + return Objects.hash(hostId, clustersConfigs); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java index cc0b5fb..0bae130 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java @@ -37,17 +37,19 @@ public class AlertDefinitionsUpdateEvent extends AmbariHostUpdateEvent implement private final Map clusters; private final EventType eventType; private final String hostName; + private final Long hostId; private String hash; public static AlertDefinitionsUpdateEvent emptyEvent() { - return new AlertDefinitionsUpdateEvent(null, null, null); + return new AlertDefinitionsUpdateEvent(null, null, null, null); } - public AlertDefinitionsUpdateEvent(EventType eventType, Map clusters, String hostName) { + public AlertDefinitionsUpdateEvent(EventType eventType, Map clusters, String hostName, Long hostId) { super(Type.ALERT_DEFINITIONS); this.eventType = eventType; this.clusters = clusters != null ? Collections.unmodifiableMap(clusters) : null; this.hostName = hostName; + this.hostId = hostId; } @Override @@ -61,7 +63,6 @@ public class AlertDefinitionsUpdateEvent extends AmbariHostUpdateEvent implement this.hash = hash; } - @Override @JsonProperty("hostName") public String getHostName() { return hostName; @@ -93,6 +94,11 @@ public class AlertDefinitionsUpdateEvent extends AmbariHostUpdateEvent implement return Objects.hash(eventType, clusters); } + @Override + public Long getHostId() { + return hostId; + } + public enum EventType { /** Full current alert definitions */ CREATE, http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java index 495e4c8..fe49906 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java @@ -25,11 +25,11 @@ import java.beans.Transient; public abstract class AmbariHostUpdateEvent extends AmbariUpdateEvent { /** - * Host name message will sent to. - * @return host name. + * Host id message will sent to. + * @return host id. */ @Transient - public abstract String getHostName(); + public abstract Long getHostId(); public AmbariHostUpdateEvent(Type type) { super(type); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java index eefca6b..af4a9af 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java @@ -246,43 +246,4 @@ public class ConfigsUpdateEvent extends AmbariUpdateEvent { return result; } } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ConfigsUpdateEvent that = (ConfigsUpdateEvent) o; - - if (serviceConfigId != null ? !serviceConfigId.equals(that.serviceConfigId) : that.serviceConfigId != null) - return false; - if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) return false; - if (serviceName != null ? !serviceName.equals(that.serviceName) : that.serviceName != null) return false; - if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false; - if (version != null ? !version.equals(that.version) : that.version != null) return false; - if (user != null ? !user.equals(that.user) : that.user != null) return false; - if (note != null ? !note.equals(that.note) : that.note != null) return false; - if (hostNames != null ? !hostNames.equals(that.hostNames) : that.hostNames != null) return false; - if (createTime != null ? !createTime.equals(that.createTime) : that.createTime != null) return false; - if (groupName != null ? !groupName.equals(that.groupName) : that.groupName != null) return false; - if (configs != null ? !configs.equals(that.configs) : that.configs != null) return false; - return changedConfigTypes != null ? changedConfigTypes.equals(that.changedConfigTypes) : that.changedConfigTypes == null; - } - - @Override - public int hashCode() { - int result = serviceConfigId != null ? serviceConfigId.hashCode() : 0; - result = 31 * result + (clusterId != null ? clusterId.hashCode() : 0); - result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0); - result = 31 * result + (groupId != null ? groupId.hashCode() : 0); - result = 31 * result + (version != null ? version.hashCode() : 0); - result = 31 * result + (user != null ? user.hashCode() : 0); - result = 31 * result + (note != null ? note.hashCode() : 0); - result = 31 * result + (hostNames != null ? hostNames.hashCode() : 0); - result = 31 * result + (createTime != null ? createTime.hashCode() : 0); - result = 31 * result + (groupName != null ? groupName.hashCode() : 0); - result = 31 * result + (configs != null ? configs.hashCode() : 0); - result = 31 * result + (changedConfigTypes != null ? changedConfigTypes.hashCode() : 0); - return result; - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java index 8f8b5a0..c97ed60 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java @@ -31,9 +31,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class ExecutionCommandEvent extends AmbariHostUpdateEvent { /** - * Host name with agent execution commands will be send to. + * Host id with agent execution commands will be send to. */ - private String hostName; + private Long hostId; /** * Execution commands grouped by cluster id. @@ -54,15 +54,6 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent { this.clusters = clusters; } - public void setHostName(String hostName) { - this.hostName = hostName; - } - - @Override - public String getHostName() { - return hostName; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -70,14 +61,23 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent { ExecutionCommandEvent that = (ExecutionCommandEvent) o; - if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false; + if (hostId != null ? !hostId.equals(that.hostId) : that.hostId != null) return false; return clusters != null ? clusters.equals(that.clusters) : that.clusters == null; } @Override public int hashCode() { - int result = hostName != null ? hostName.hashCode() : 0; + int result = hostId != null ? hostId.hashCode() : 0; result = 31 * result + (clusters != null ? clusters.hashCode() : 0); return result; } + + public void setHostId(Long hostId) { + this.hostId = hostId; + } + + @Override + public Long getHostId() { + return hostId; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java index d68e802..8948391 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java @@ -42,7 +42,7 @@ public class HostLevelParamsUpdateEvent extends AmbariHostUpdateEvent implements /** * Host identifier. */ - private String hostName; + private Long hostId; /** * Host level parameters by clusters. @@ -73,13 +73,13 @@ public class HostLevelParamsUpdateEvent extends AmbariHostUpdateEvent implements return new HostLevelParamsUpdateEvent(null); } - public void setHostName(String hostName) { - this.hostName = hostName; + public void setHostId(Long hostId) { + this.hostId = hostId; } @Override - public String getHostName() { - return hostName; + public Long getHostId() { + return hostId; } @Override @@ -89,12 +89,12 @@ public class HostLevelParamsUpdateEvent extends AmbariHostUpdateEvent implements HostLevelParamsUpdateEvent that = (HostLevelParamsUpdateEvent) o; - return Objects.equals(hostName, that.hostName) && + return Objects.equals(hostId, that.hostId) && Objects.equals(hostLevelParamsClusters, that.hostLevelParamsClusters); } @Override public int hashCode() { - return Objects.hash(hostName, hostLevelParamsClusters); + return Objects.hash(hostId, hostLevelParamsClusters); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java index d39c4ae..a458ea3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java @@ -90,17 +90,13 @@ public class ServiceUpdateEvent extends AmbariUpdateEvent { ServiceUpdateEvent that = (ServiceUpdateEvent) o; if (clusterName != null ? !clusterName.equals(that.clusterName) : that.clusterName != null) return false; - if (maintenanceState != that.maintenanceState) return false; - if (serviceName != null ? !serviceName.equals(that.serviceName) : that.serviceName != null) return false; - return state == that.state; + return serviceName != null ? serviceName.equals(that.serviceName) : that.serviceName == null; } @Override public int hashCode() { int result = clusterName != null ? clusterName.hashCode() : 0; - result = 31 * result + (maintenanceState != null ? maintenanceState.hashCode() : 0); result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0); - result = 31 * result + (state != null ? state.hashCode() : 0); return result; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java index 548ea41..17e8c3d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java @@ -57,9 +57,9 @@ public class StateUpdateListener { String destination = event.getDestination(); if (event instanceof AmbariHostUpdateEvent) { AmbariHostUpdateEvent hostUpdateEvent = (AmbariHostUpdateEvent) event; - String hostName = hostUpdateEvent.getHostName(); - String sessionId = agentSessionManager.getSessionId(hostName); - LOG.debug("Received status update event {} for host {} registered with session ID {}", hostUpdateEvent, hostName, sessionId); + Long hostId = hostUpdateEvent.getHostId(); + String sessionId = agentSessionManager.getSessionId(hostId); + LOG.debug("Received status update event {} for host {} registered with session ID {}", hostUpdateEvent, hostId, sessionId); MessageHeaders headers = createHeaders(sessionId); simpMessagingTemplate.convertAndSendToUser(sessionId, destination, event, headers); } else { http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java index 24c8166..50dc144 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java @@ -18,7 +18,9 @@ package org.apache.ambari.server.events.listeners.services; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.EagerSingleton; @@ -63,23 +65,26 @@ public class ServiceUpdateListener { @Subscribe public void onHostComponentUpdate(HostComponentsUpdateEvent event) throws AmbariException { + Map> clustersServices = new HashMap<>(); for (HostComponentUpdate hostComponentUpdate : event.getHostComponentUpdates()) { Long clusterId = hostComponentUpdate.getClusterId(); - String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName(); String serviceName = hostComponentUpdate.getServiceName(); + clustersServices.computeIfAbsent(clusterId, c -> new HashSet<>()).add(serviceName); + } + for (Map.Entry> clusterServices : clustersServices.entrySet()) { + Long clusterId = clusterServices.getKey(); + String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName(); + for (String serviceName : clusterServices.getValue()) { + ServiceCalculatedState serviceCalculatedState = ServiceCalculatedStateFactory.getServiceStateProvider(serviceName); + State serviceState = serviceCalculatedState.getState(clusterName, serviceName); - ServiceCalculatedState serviceCalculatedState = ServiceCalculatedStateFactory.getServiceStateProvider(serviceName); - State serviceState = serviceCalculatedState.getState(clusterName, serviceName); - - // retrieve state from cache - if (states.containsKey(clusterId) && states.get(clusterId).containsKey(serviceName) && states.get(clusterId).get(serviceName).equals(serviceState)) { - continue; - } - if (!states.containsKey(clusterId)) { - states.put(clusterId, new HashMap<>()); + // retrieve state from cache + if (states.containsKey(clusterId) && states.get(clusterId).containsKey(serviceName) && states.get(clusterId).get(serviceName).equals(serviceState)) { + continue; + } + states.computeIfAbsent(clusterId, c -> new HashMap<>()).put(serviceName, serviceState); + stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, serviceState)); } - states.get(clusterId).put(serviceName, serviceState); - stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, serviceState)); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java index 1dd8a5b..58bfcfc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java @@ -42,17 +42,12 @@ import org.apache.ambari.server.events.TaskCreateEvent; import org.apache.ambari.server.events.TaskUpdateEvent; import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; import org.apache.ambari.server.events.publishers.TaskEventPublisher; -import org.apache.ambari.server.orm.dao.ClusterDAO; -import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.StageDAO; -import org.apache.ambari.server.orm.entities.ClusterEntity; -import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.StageEntityPK; -import org.apache.ambari.server.topology.TopologyManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,26 +95,14 @@ public class TaskStatusListener { private RequestDAO requestDAO; - private HostRoleCommandDAO hostRoleCommandDAO; - - private TopologyManager topologyManager; - private StateUpdateEventPublisher stateUpdateEventPublisher; - private ClusterDAO clusterDAO; - @Inject public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO, - StateUpdateEventPublisher stateUpdateEventPublisher, - HostRoleCommandDAO hostRoleCommandDAO, - TopologyManager topologyManager, - ClusterDAO clusterDAO) { + StateUpdateEventPublisher stateUpdateEventPublisher) { this.stageDAO = stageDAO; this.requestDAO = requestDAO; this.stateUpdateEventPublisher = stateUpdateEventPublisher; - this.hostRoleCommandDAO = hostRoleCommandDAO; - this.topologyManager = topologyManager; - this.clusterDAO = clusterDAO; taskEventPublisher.register(this); } @@ -307,23 +290,13 @@ public class TaskStatusListener { * @param requestIdsWithReceivedTaskStatus set of request ids that has received tasks status * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status */ - private void updateActiveRequestsStatus(final Set requestIdsWithReceivedTaskStatus, Set stagesWithChangedTaskStatus) throws ClusterNotFoundException { + private void updateActiveRequestsStatus(final Set requestIdsWithReceivedTaskStatus, Set stagesWithChangedTaskStatus) { for (Long reportedRequestId : requestIdsWithReceivedTaskStatus) { if (activeRequestMap.containsKey(reportedRequestId)) { ActiveRequest request = activeRequestMap.get(reportedRequestId); Boolean didStatusChange = updateRequestStatus(reportedRequestId, stagesWithChangedTaskStatus); if (didStatusChange) { - RequestEntity updated = requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus()); - ClusterEntity clusterEntity = clusterDAO.findById(updated.getClusterId()); - if (clusterEntity == null) { - throw new ClusterNotFoundException(updated.getClusterId()); - } - List hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(updated.getRequestId()); - stateUpdateEventPublisher.publish(new RequestUpdateEvent(updated, - hostRoleCommandDAO, - topologyManager, - clusterEntity.getClusterName(), - hostRoleCommandEntities)); + requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus()); } if (request.isCompleted() && isAllTasksCompleted(reportedRequestId)) { // Request is considered ton have been finished if request status and all of it's tasks status are completed http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java index e09d5ca..bacdc2d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java @@ -70,31 +70,31 @@ public class AgentCommandsPublisher { @Inject private StateUpdateEventPublisher stateUpdateEventPublisher; - public void sendAgentCommand(Multimap agentCommands) throws AmbariException { + public void sendAgentCommand(Multimap agentCommands) throws AmbariException { if (agentCommands != null && !agentCommands.isEmpty()) { - Map> executionCommandsClusters = new TreeMap<>(); - for (Map.Entry acHostEntry : agentCommands.entries()) { - String hostName = acHostEntry.getKey(); + Map> executionCommandsClusters = new TreeMap<>(); + for (Map.Entry acHostEntry : agentCommands.entries()) { + Long hostId = acHostEntry.getKey(); AgentCommand ac = acHostEntry.getValue(); - populateExecutionCommandsClusters(executionCommandsClusters, hostName, ac); + populateExecutionCommandsClusters(executionCommandsClusters, hostId, ac); } - for (Map.Entry> hostEntry : executionCommandsClusters.entrySet()) { - String hostName = hostEntry.getKey(); + for (Map.Entry> hostEntry : executionCommandsClusters.entrySet()) { + Long hostId = hostEntry.getKey(); ExecutionCommandEvent executionCommandEvent = new ExecutionCommandEvent(hostEntry.getValue()); - executionCommandEvent.setHostName(hostName); + executionCommandEvent.setHostId(hostId); stateUpdateEventPublisher.publish(executionCommandEvent); } } } - public void sendAgentCommand(String hostName, AgentCommand agentCommand) throws AmbariException { - Multimap agentCommands = ArrayListMultimap.create(); - agentCommands.put(hostName, agentCommand); + public void sendAgentCommand(Long hostId, AgentCommand agentCommand) throws AmbariException { + Multimap agentCommands = ArrayListMultimap.create(); + agentCommands.put(hostId, agentCommand); sendAgentCommand(agentCommands); } - private void populateExecutionCommandsClusters(Map> executionCommandsClusters, - String hostName, AgentCommand ac) throws AmbariException { + private void populateExecutionCommandsClusters(Map> executionCommandsClusters, + Long hostId, AgentCommand ac) throws AmbariException { try { if (LOG.isDebugEnabled()) { LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac)); @@ -114,7 +114,7 @@ public class AgentCommandsPublisher { if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) { LOG.info(String.format("%s called", customCommand)); try { - injectKeytab(ec, customCommand, hostName); + injectKeytab(ec, customCommand, clusters.getHostById(hostId).getHostName()); } catch (IOException e) { throw new AmbariException("Could not inject keytab into command", e); } @@ -126,15 +126,15 @@ public class AgentCommandsPublisher { clusterId = Long.toString(clusters.getCluster(clusterName).getClusterId()); } ec.setClusterId(clusterId); - prepareExecutionCommandsClusters(executionCommandsClusters, hostName, clusterId); - executionCommandsClusters.get(hostName).get(clusterId).getExecutionCommands().add((ExecutionCommand) ac); + prepareExecutionCommandsClusters(executionCommandsClusters, hostId, clusterId); + executionCommandsClusters.get(hostId).get(clusterId).getExecutionCommands().add((ExecutionCommand) ac); break; } case CANCEL_COMMAND: { CancelCommand cc = (CancelCommand) ac; String clusterId = Long.toString(hostRoleCommandDAO.findByPK(cc.getTargetTaskId()).getStage().getClusterId()); - prepareExecutionCommandsClusters(executionCommandsClusters, hostName, clusterId); - executionCommandsClusters.get(hostName).get(clusterId).getCancelCommands().add(cc); + prepareExecutionCommandsClusters(executionCommandsClusters, hostId, clusterId); + executionCommandsClusters.get(hostId).get(clusterId).getCancelCommands().add(cc); break; } default: @@ -143,13 +143,13 @@ public class AgentCommandsPublisher { } } - private void prepareExecutionCommandsClusters(Map> executionCommandsClusters, - String hostName, String clusterId) { - if (!executionCommandsClusters.containsKey(hostName)) { - executionCommandsClusters.put(hostName, new TreeMap<>()); + private void prepareExecutionCommandsClusters(Map> executionCommandsClusters, + Long hostId, String clusterId) { + if (!executionCommandsClusters.containsKey(hostId)) { + executionCommandsClusters.put(hostId, new TreeMap<>()); } - if (!executionCommandsClusters.get(hostName).containsKey(clusterId)) { - executionCommandsClusters.get(hostName).put(clusterId, new ExecutionCommandsCluster(new ArrayList<>(), + if (!executionCommandsClusters.get(hostId).containsKey(clusterId)) { + executionCommandsClusters.get(hostId).put(clusterId, new ExecutionCommandsCluster(new ArrayList<>(), new ArrayList<>())); } }