ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpapirkovs...@apache.org
Subject [4/4] ambari git commit: AMBARI-22063. Poor performance of STOMP subscriptions cache and registration handling. (mpapirkovskyy)
Date Thu, 05 Oct 2017 10:46:39 GMT
AMBARI-22063. Poor performance of STOMP subscriptions cache and registration handling. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6ecac18c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6ecac18c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6ecac18c

Branch: refs/heads/branch-3.0-perf
Commit: 6ecac18cb5668ec3b74fe69751dc54d65f33f698
Parents: f0def7c
Author: Myroslav Papirkovskyi <mpapyrkovskyy@hortonworks.com>
Authored: Thu Oct 5 13:45:13 2017 +0300
Committer: Myroslav Papirkovskyi <mpapyrkovskyy@hortonworks.com>
Committed: Thu Oct 5 13:45:13 2017 +0300

----------------------------------------------------------------------
 .../server/HostNotRegisteredException.java      |   4 +-
 .../server/actionmanager/ActionScheduler.java   |  20 +-
 .../apache/ambari/server/agent/AgentReport.java |  53 ++
 .../server/agent/AgentReportsProcessor.java     | 100 ++++
 .../server/agent/AgentSessionManager.java       |  20 +-
 .../ambari/server/agent/HeartBeatHandler.java   | 121 +----
 .../ambari/server/agent/HeartbeatMonitor.java   |   2 +
 .../ambari/server/agent/HeartbeatProcessor.java |  49 +-
 .../agent/stomp/AgentClusterDataHolder.java     |  27 +-
 .../server/agent/stomp/AgentConfigsHolder.java  |  20 +-
 .../agent/stomp/AgentCurrentDataController.java |  19 +-
 .../server/agent/stomp/AgentHostDataHolder.java |  40 +-
 .../agent/stomp/AgentReportsController.java     |  24 +-
 .../agent/stomp/AgentsRegistrationQueue.java    |  76 +++
 .../agent/stomp/AlertDefinitionsHolder.java     |  40 +-
 .../agent/stomp/AmbariSubscriptionRegistry.java | 536 +++++++++++++++++++
 .../server/agent/stomp/HeartbeatController.java | 150 ++++--
 .../agent/stomp/HostLevelParamsHolder.java      |  10 +-
 .../server/agent/stomp/MetadataHolder.java      |   6 +-
 .../server/agent/stomp/dto/ClusterConfigs.java  |  16 +-
 .../ambari/server/agent/stomp/dto/Hash.java     |   6 +-
 .../server/agent/stomp/dto/MetadataCluster.java |   6 +-
 .../agent/stomp/dto/MetadataServiceInfo.java    |   6 +-
 .../server/agent/stomp/dto/TopologyCluster.java |   6 +-
 .../agent/stomp/dto/TopologyComponent.java      |   6 +-
 .../server/agent/stomp/dto/TopologyHost.java    |   6 +-
 .../server/configuration/Configuration.java     |  86 +++
 .../spring/AgentRegisteringQueueChecker.java    |  55 ++
 .../configuration/spring/AgentStompConfig.java  |   7 +-
 .../configuration/spring/GuiceBeansConfig.java  |  12 +
 .../configuration/spring/RootStompConfig.java   |  35 +-
 .../controller/AmbariManagementController.java  |   3 +
 .../AmbariManagementControllerImpl.java         |  49 +-
 .../ambari/server/controller/AmbariServer.java  |  13 +-
 .../internal/HostResourceProvider.java          |   8 +-
 .../state/DefaultServiceCalculatedState.java    |   2 +-
 .../state/FlumeServiceCalculatedState.java      |   2 +-
 .../state/HBaseServiceCalculatedState.java      |   2 +-
 .../state/HDFSServiceCalculatedState.java       |   2 +-
 .../state/HiveServiceCalculatedState.java       |   2 +-
 .../state/OozieServiceCalculatedState.java      |   2 +-
 .../state/YARNServiceCalculatedState.java       |   2 +-
 .../server/events/AgentConfigsUpdateEvent.java  |  20 +-
 .../events/AlertDefinitionsUpdateEvent.java     |  12 +-
 .../server/events/AmbariHostUpdateEvent.java    |   6 +-
 .../server/events/ConfigsUpdateEvent.java       |  39 --
 .../server/events/ExecutionCommandEvent.java    |  26 +-
 .../events/HostLevelParamsUpdateEvent.java      |  14 +-
 .../server/events/ServiceUpdateEvent.java       |   6 +-
 .../listeners/requests/StateUpdateListener.java |   6 +-
 .../services/ServiceUpdateListener.java         |  29 +-
 .../listeners/tasks/TaskStatusListener.java     |  33 +-
 .../publishers/AgentCommandsPublisher.java      |  48 +-
 .../BufferedUpdateEventPublisher.java           |  73 +++
 .../HostComponentUpdateEventPublisher.java      |  41 +-
 .../publishers/ServiceUpdateEventPublisher.java |  68 +++
 .../publishers/StateUpdateEventPublisher.java   |  11 +-
 .../orm/dao/HostComponentDesiredStateDAO.java   |  34 +-
 .../dao/ServiceComponentDesiredStateDAO.java    |  37 ++
 .../HostComponentDesiredStateEntity.java        |   8 +-
 .../orm/entities/HostComponentStateEntity.java  |  18 +
 .../ambari/server/state/ConfigHelper.java       |  40 +-
 .../org/apache/ambari/server/state/Host.java    |   8 +
 .../server/state/ServiceComponentHost.java      |   6 +
 .../state/ServiceComponentHostFactory.java      |   4 +
 .../server/state/ServiceComponentImpl.java      |  19 +-
 .../server/state/alert/AlertDefinitionHash.java |   3 +-
 .../server/state/cluster/ClusterImpl.java       |  24 +-
 .../ambari/server/state/host/HostImpl.java      | 115 +++-
 .../svccomphost/ServiceComponentHostImpl.java   |  84 ++-
 .../server/topology/TopologyDeleteFormer.java   |   7 +-
 .../server/upgrade/UpgradeCatalog300.java       |  29 +
 .../resources/Ambari-DDL-AzureDB-CREATE.sql     |   1 +
 .../main/resources/Ambari-DDL-Derby-CREATE.sql  |   1 +
 .../main/resources/Ambari-DDL-MySQL-CREATE.sql  |   1 +
 .../main/resources/Ambari-DDL-Oracle-CREATE.sql |   1 +
 .../resources/Ambari-DDL-Postgres-CREATE.sql    |   1 +
 .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql |   1 +
 .../resources/Ambari-DDL-SQLServer-CREATE.sql   |   1 +
 .../actionmanager/TestActionScheduler.java      | 142 +++--
 .../server/agent/AgentSessionManagerTest.java   |  25 +-
 .../listeners/tasks/TaskStatusListenerTest.java |  23 +-
 .../server/upgrade/UpgradeCatalog300Test.java   |  11 +
 83 files changed, 2066 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
index eadd5f1..82d42cd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
@@ -27,8 +27,8 @@ public class HostNotRegisteredException extends AmbariException {
     return new HostNotRegisteredException(String.format("Host with sessionId '%s' not registered", sessionId));
   }
 
-  public static HostNotRegisteredException forHostName(String hostName) {
-    return new HostNotRegisteredException(String.format("Host with hostName '%s' not registered", hostName));
+  public static HostNotRegisteredException forHostId(Long hostId) {
+    return new HostNotRegisteredException(String.format("Host with hostId '%s' not registered", hostId));
   }
 
   private HostNotRegisteredException(String message) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 595edcd..c41dd01 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -242,7 +242,7 @@ class ActionScheduler implements Runnable {
                             UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
                             Configuration configuration, Provider<EntityManager> entityManagerProvider,
                             HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory,
-                            RoleCommandOrderProvider roleCommandOrderProvider) {
+                            RoleCommandOrderProvider roleCommandOrderProvider, AgentCommandsPublisher agentCommandsPublisher) {
 
     sleepTime = sleepTimeMilliSec;
     actionTimeout = actionTimeoutMilliSec;
@@ -259,6 +259,7 @@ class ActionScheduler implements Runnable {
     this.hostRoleCommandFactory = hostRoleCommandFactory;
     jpaPublisher = null;
     this.roleCommandOrderProvider = roleCommandOrderProvider;
+    this.agentCommandsPublisher = agentCommandsPublisher;
 
     serverActionExecutor = new ServerActionExecutor(db, sleepTime);
     initializeCaches();
@@ -284,11 +285,12 @@ class ActionScheduler implements Runnable {
                             ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap,
                             UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
                             Configuration configuration, Provider<EntityManager> entityManagerProvider,
-                            HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory) {
+                            HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory,
+                            AgentCommandsPublisher agentCommandsPublisher) {
 
     this(sleepTimeMilliSec, actionTimeoutMilliSec, db, actionQueue, fsmObject, maxAttempts, hostsMap, unitOfWork,
             ambariEventPublisher, configuration, entityManagerProvider, hostRoleCommandDAO, hostRoleCommandFactory,
-            null);
+            null, agentCommandsPublisher);
   }
 
   /**
@@ -456,7 +458,7 @@ class ActionScheduler implements Runnable {
 
         // Commands that will be scheduled in current scheduler wakeup
         List<ExecutionCommand> commandsToSchedule = new ArrayList<>();
-        Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
+        Multimap<Long, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
 
         Map<String, RoleStats> roleStats =
           processInProgressStage(stage, commandsToSchedule, commandsToEnqueue);
@@ -559,7 +561,7 @@ class ActionScheduler implements Runnable {
           if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) {
             serverActionExecutor.awake();
           } else {
-            commandsToEnqueue.put(cmd.getHostname(), cmd);
+            commandsToEnqueue.put(clusters.getHost(cmd.getHostname()).getHostId(), cmd);
           }
         }
         agentCommandsPublisher.sendAgentCommand(commandsToEnqueue);
@@ -746,7 +748,7 @@ class ActionScheduler implements Runnable {
    * whether stage has succeeded or failed
    */
   protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule,
-                                                          Multimap<String, AgentCommand> commandsToEnqueue) throws AmbariException {
+                                                          Multimap<Long, AgentCommand> commandsToEnqueue) throws AmbariException {
     LOG.debug("==> Collecting commands to schedule...");
     // Map to track role status
     Map<String, RoleStats> roleStats = initRoleStats(s);
@@ -1274,7 +1276,7 @@ class ActionScheduler implements Runnable {
           CancelCommand cancelCommand = new CancelCommand();
           cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
           cancelCommand.setReason(reason);
-          agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostName(), cancelCommand);
+          agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostId(), cancelCommand);
         }
       }
 
@@ -1294,7 +1296,7 @@ class ActionScheduler implements Runnable {
     }
   }
 
-  void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<String, AgentCommand> commandsToEnqueue) {
+  void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<Long, AgentCommand> commandsToEnqueue) {
     for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
       // There are no server actions in actionQueue
       if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) {
@@ -1303,7 +1305,7 @@ class ActionScheduler implements Runnable {
           CancelCommand cancelCommand = new CancelCommand();
           cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
           cancelCommand.setReason("Stage timeout");
-          commandsToEnqueue.put(hostRoleCommand.getHostName(), cancelCommand);
+          commandsToEnqueue.put(hostRoleCommand.getHostId(), cancelCommand);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
new file mode 100644
index 0000000..817a238
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.List;
+
+import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+
+public class AgentReport {
+
+  private String hostName;
+  private List<ComponentStatus> componentStatuses;
+  private List<CommandReport> reports;
+  private HostStatusReport hostStatusReport;
+
+  public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
+    this.hostName = hostName;
+    this.componentStatuses = componentStatuses;
+    this.reports = reports;
+    this.hostStatusReport = hostStatusReport;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public List<ComponentStatus> getComponentStatuses() {
+    return componentStatuses;
+  }
+
+  public List<CommandReport> getCommandReports() {
+    return reports;
+  }
+
+  public HostStatusReport getHostStatusReport() {
+    return hostStatusReport;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
new file mode 100644
index 0000000..586a16e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.persist.UnitOfWork;
+
+@Singleton
+public class AgentReportsProcessor {
+  private static final Logger LOG = LoggerFactory.getLogger(AgentReportsProcessor.class);
+
+  private ScheduledExecutorService executor;
+
+  private ConcurrentLinkedQueue<AgentReport> agentReportsQueue = new ConcurrentLinkedQueue<>();
+
+  public void addAgentReport(AgentReport agentReport) {
+    agentReportsQueue.add(agentReport);
+  }
+
+  @Inject
+  private HeartBeatHandler hh;
+
+  @Inject
+  private UnitOfWork unitOfWork;
+
+  @Inject
+  private Configuration configuration;
+
+  public AgentReportsProcessor() {
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("agent-report-processor-%d").build();
+    int poolSize = configuration.getAgentsReportThreadPoolSize();
+    executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
+    for (int i=0; i< poolSize; i++) {
+      executor.scheduleAtFixedRate(new AgentReportProcessingTask(),
+          configuration.getAgentsReportProcessingStartTimeout(),
+          configuration.getAgentsReportProcessingPeriod(), TimeUnit.SECONDS);
+    }
+  }
+
+  private class AgentReportProcessingTask implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        unitOfWork.begin();
+        while (true) {
+          AgentReport agentReport = agentReportsQueue.poll();
+          if (agentReport == null) {
+            break;
+          }
+          String hostName = agentReport.getHostName();
+          try {
+
+            //TODO rewrite with polymorphism usage.
+            if (agentReport.getCommandReports() != null) {
+              hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName);
+            } else if (agentReport.getComponentStatuses() != null) {
+              hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName);
+            } else if (agentReport.getHostStatusReport() != null) {
+              hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName);
+            }
+          } catch (AmbariException e) {
+            LOG.error("Error processing agent reports", e);
+          }
+        }
+      } finally {
+        unitOfWork.end();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
index 3040f55..2f435bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
@@ -30,14 +30,14 @@ import com.google.inject.Singleton;
 public class AgentSessionManager {
 
   private final ConcurrentMap<String, Host> registeredHosts = new ConcurrentHashMap<>(); // session ID -> host
-  private final ConcurrentMap<String, String> registeredSessionIds = new ConcurrentHashMap<>(); // hostname -> session ID
+  private final ConcurrentMap<Long, String> registeredSessionIds = new ConcurrentHashMap<>();
 
   public void register(String sessionId, Host host) {
     Preconditions.checkNotNull(sessionId);
     Preconditions.checkNotNull(host);
-    Preconditions.checkNotNull(host.getHostName());
+    Preconditions.checkNotNull(host.getHostId());
 
-    String oldSessionId = registeredSessionIds.put(host.getHostName(), sessionId);
+    String oldSessionId = registeredSessionIds.put(host.getHostId(), sessionId);
     if (oldSessionId != null) {
       registeredHosts.remove(oldSessionId);
     }
@@ -59,21 +59,21 @@ public class AgentSessionManager {
     throw HostNotRegisteredException.forSessionId(sessionId);
   }
 
-  public String getSessionId(String hostName) throws HostNotRegisteredException {
-    Preconditions.checkNotNull(hostName);
+  public String getSessionId(Long hostId) throws HostNotRegisteredException {
+    Preconditions.checkNotNull(hostId);
 
-    String sessionId = registeredSessionIds.get(hostName);
+    String sessionId = registeredSessionIds.get(hostId);
     if (sessionId != null) {
       return sessionId;
     }
 
-    throw HostNotRegisteredException.forHostName(hostName);
+    throw HostNotRegisteredException.forHostId(hostId);
   }
 
-  public void unregisterByHost(String hostName) {
-    Preconditions.checkNotNull(hostName);
+  public void unregisterByHost(Long hostId) {
+    Preconditions.checkNotNull(hostId);
 
-    String sessionId = registeredSessionIds.remove(hostName);
+    String sessionId = registeredSessionIds.remove(hostId);
     if (sessionId != null) {
       registeredHosts.remove(sessionId);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 00d469f..ee6e05c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -17,11 +17,9 @@
  */
 package org.apache.ambari.server.agent;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
@@ -42,7 +40,6 @@ import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.StackId;
-import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent;
@@ -190,6 +187,15 @@ public class HeartBeatHandler {
     hostResponseIds.put(hostname, currentResponseId);
     hostResponses.put(hostname, response);
 
+    // If the host is waiting for component status updates, notify it
+    if (hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
+      try {
+        LOG.debug("Got component status updates for host {}", hostname);
+        hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
+      } catch (InvalidStateTransitionException e) {
+        LOG.warn("Failed to notify the host {} about component status updates", hostname, e);
+      }
+    }
     if (heartbeat.getRecoveryReport() != null) {
       RecoveryReport rr = heartbeat.getRecoveryReport();
       processRecoveryReport(rr, hostname);
@@ -204,27 +210,6 @@ public class HeartBeatHandler {
       return createRegisterCommand();
     }
 
-    /*
-     * A host can belong to only one cluster. Though getClustersForHost(hostname)
-     * returns a set of clusters, it will have only one entry.
-     *
-     * TODO: Handle the case when a host is a part of multiple clusters.
-     */
-    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
-
-    if (clusters.size() > 0) {
-      String clusterName = clusters.iterator().next().getClusterName();
-
-      if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
-        RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
-        response.setRecoveryConfig(rc);
-
-        if (response.getRecoveryConfig() != null) {
-          LOG.debug("Recovery configuration set to {}", response.getRecoveryConfig().toString());
-        }
-      }
-    }
-
     heartbeatProcessor.addHeartbeat(heartbeat);
 
     // Send commands if node is active
@@ -253,7 +238,7 @@ public class HeartBeatHandler {
     } catch (InvalidStateTransitionException ex) {
       LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
       host.setState(HostState.INIT);
-      agentSessionManager.unregisterByHost(hostname);
+      agentSessionManager.unregisterByHost(host.getHostId());
     }
   }
 
@@ -338,19 +323,11 @@ public class HeartBeatHandler {
     }
 
     // Resetting host state
-    hostObject.setState(HostState.INIT);
+    hostObject.setStateMachineState(HostState.INIT);
 
     // Set ping port for agent
     hostObject.setCurrentPingPort(currentPingPort);
 
-    // Get status of service components
-    List<StatusCommand> cmds = heartbeatMonitor.generateStatusCommands(hostname);
-
-    // Add request for component version
-    for (StatusCommand command: cmds) {
-      command.getCommandParams().put("request_version", String.valueOf(true));
-    }
-
     // Save the prefix of the log file paths
     hostObject.setPrefix(register.getPrefix());
 
@@ -360,43 +337,6 @@ public class HeartBeatHandler {
         register.getAgentEnv()));
 
     RegistrationResponse response = new RegistrationResponse();
-    if (cmds.isEmpty()) {
-      //No status commands needed let the fsm know that status step is done
-      hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname,
-          now));
-    }
-
-    response.setStatusCommands(cmds);
-
-    response.setResponseStatus(RegistrationStatus.OK);
-
-    // force the registering agent host to receive its list of alert definitions
-    List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname);
-    response.setAlertDefinitionCommands(alertDefinitionCommands);
-
-    response.setAgentConfig(config.getAgentConfigsMap());
-    if(response.getAgentConfig() != null) {
-      LOG.debug("Agent configuration map set to {}", response.getAgentConfig());
-    }
-
-    /*
-     * A host can belong to only one cluster. Though getClustersForHost(hostname)
-     * returns a set of clusters, it will have only one entry.
-     *
-     * TODO: Handle the case when a host is a part of multiple clusters.
-     */
-    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
-
-    if (clusters.size() > 0) {
-      String clusterName = clusters.iterator().next().getClusterName();
-
-      RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
-      response.setRecoveryConfig(rc);
-
-      if(response.getRecoveryConfig() != null) {
-        LOG.info("Recovery configuration set to " + response.getRecoveryConfig());
-      }
-    }
 
     Long requestId = 0L;
     hostResponseIds.put(hostname, requestId);
@@ -463,45 +403,6 @@ public class HeartBeatHandler {
     return response;
   }
 
-  /**
-   * Gets the {@link AlertDefinitionCommand} instances that need to be sent for
-   * each cluster that the registering host is a member of.
-   *
-   * @param hostname
-   * @return
-   * @throws AmbariException
-   */
-  private List<AlertDefinitionCommand> getRegistrationAlertDefinitionCommands(
-      String hostname) throws AmbariException {
-
-    Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostname);
-    if (null == hostClusters || hostClusters.size() == 0) {
-      return null;
-    }
-
-    List<AlertDefinitionCommand> commands = new ArrayList<>();
-
-    // for every cluster this host is a member of, build the command
-    for (Cluster cluster : hostClusters) {
-      String clusterName = cluster.getClusterName();
-      alertDefinitionHash.invalidate(clusterName, hostname);
-
-      List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions(
-          clusterName, hostname);
-
-      String hash = alertDefinitionHash.getHash(clusterName, hostname);
-      Host host = cluster.getHost(hostname);
-      String publicHostName = host == null? hostname : host.getPublicHostName();
-      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
-          hostname, publicHostName, hash, definitions);
-
-      command.addConfigs(configHelper, cluster);
-      commands.add(command);
-    }
-
-    return commands;
-  }
-
   public void stop() {
     heartbeatMonitor.shutdown();
     heartbeatProcessor.stopAsync();

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 29db219..c5caf85 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -171,7 +171,9 @@ public class HeartbeatMonitor implements Runnable {
               !sch.getState().equals(State.UNINSTALLED) &&
               !sch.getState().equals(State.DISABLED)) {
               LOG.warn("Setting component state to UNKNOWN for component " + sc.getName() + " on " + host);
+              State oldState = sch.getState();
               sch.setState(State.UNKNOWN);
+              sch.setLastValidState(oldState);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
index ef9b0f2..466b24c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -56,15 +56,12 @@ import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostHealthStatus;
 import org.apache.ambari.server.state.HostState;
-import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
-import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.UpgradeState;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.host.HostStatusUpdatesReceivedEvent;
@@ -297,51 +294,7 @@ public class HeartbeatProcessor extends AbstractService{
       }
 
       if (calculateHostStatus) {
-        //Use actual component status to compute the host status
-        int masterCount = 0;
-        int mastersRunning = 0;
-        int slaveCount = 0;
-        int slavesRunning = 0;
-
-        Cluster cluster = clusterFsm.getCluster(clusterId);
-
-        List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(hostName);
-        for (ServiceComponentHost scHost : scHosts) {
-          StackId stackId = scHost.getDesiredStackId();
-
-          ComponentInfo componentInfo =
-              ambariMetaInfo.getComponent(stackId.getStackName(),
-                  stackId.getStackVersion(), scHost.getServiceName(),
-                  scHost.getServiceComponentName());
-
-          String status = scHost.getState().name();
-
-          String category = componentInfo.getCategory();
-
-          if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, host)) {
-            if (category.equals("MASTER")) {
-              ++masterCount;
-              if (status.equals("STARTED")) {
-                ++mastersRunning;
-              }
-            } else if (category.equals("SLAVE")) {
-              ++slaveCount;
-              if (status.equals("STARTED")) {
-                ++slavesRunning;
-              }
-            }
-          }
-        }
-
-        if (masterCount == mastersRunning && slaveCount == slavesRunning) {
-          healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
-        } else if (masterCount > 0 && mastersRunning < masterCount) {
-          healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY;
-        } else {
-          healthStatus = HostHealthStatus.HealthStatus.ALERT;
-        }
-
-        host.setStatus(healthStatus.name());
+        host.calculateHostStatus(clusterId);
       }
 
       //If host doesn't belong to any cluster

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
index 11f299c..f966386 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,6 +19,8 @@
 package org.apache.ambari.server.agent.stomp;
 
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.inject.Inject;
 
@@ -38,9 +40,17 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha
 
   private T data;
 
+  //TODO perhaps need optimization
+  private Lock lock = new ReentrantLock();
+
   public T getUpdateIfChanged(String agentHash) throws AmbariException {
-    initializeDataIfNeeded(true);
-    return !Objects.equals(agentHash, data.getHash()) ? data : getEmptyData();
+    try {
+      lock.lock();
+      initializeDataIfNeeded(true);
+      return !Objects.equals(agentHash, data.getHash()) ? data : getEmptyData();
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -71,7 +81,12 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha
   }
 
   protected final void regenerateHash() {
-    regenerateHash(data);
+    try {
+      lock.lock();
+      regenerateHash(data);
+    } finally {
+      lock.unlock();
+    }
   }
 
   protected final void initializeDataIfNeeded(boolean regenerateHash) throws AmbariException {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
index 54d8c23..50779ff 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
@@ -40,28 +40,28 @@ public class AgentConfigsHolder extends AgentHostDataHolder<AgentConfigsUpdateEv
   private Provider<Clusters> clusters;
 
   @Override
-  public AgentConfigsUpdateEvent getCurrentData(String hostName) throws AmbariException {
-    return configHelper.getHostActualConfigs(hostName);
+  public AgentConfigsUpdateEvent getCurrentData(Long hostId) throws AmbariException {
+    return configHelper.getHostActualConfigs(hostId);
   }
 
   protected boolean handleUpdate(AgentConfigsUpdateEvent update) throws AmbariException {
-    setData(update, update.getHostName());
+    setData(update, update.getHostId());
     return true;
   }
 
-  public void updateData(Long clusterId, List<String> hostNames) throws AmbariException {
-    if (CollectionUtils.isEmpty(hostNames)) {
+  public void updateData(Long clusterId, List<Long> hostIds) throws AmbariException {
+    if (CollectionUtils.isEmpty(hostIds)) {
       // TODO cluster configs will be created before hosts assigning
       if (CollectionUtils.isEmpty(clusters.get().getCluster(clusterId).getHosts())) {
-        hostNames = clusters.get().getHosts().stream().map(Host::getHostName).collect(Collectors.toList());
+        hostIds = clusters.get().getHosts().stream().map(Host::getHostId).collect(Collectors.toList());
       } else {
-        hostNames = clusters.get().getCluster(clusterId).getHosts().stream().map(Host::getHostName).collect(Collectors.toList());
+        hostIds = clusters.get().getCluster(clusterId).getHosts().stream().map(Host::getHostId).collect(Collectors.toList());
       }
     }
 
-    for (String hostName : hostNames) {
-      AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs(hostName);
-      agentConfigsUpdateEvent.setHostName(hostName);
+    for (Long hostId : hostIds) {
+      AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs(hostId);
+      agentConfigsUpdateEvent.setHostId(hostId);
       updateData(agentConfigsUpdateEvent);
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
index 5ea5f06..0a46ce1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
@@ -29,7 +29,6 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.MessageMapping;
 import org.springframework.messaging.simp.annotation.SendToUser;
-import org.springframework.messaging.simp.annotation.SubscribeMapping;
 import org.springframework.stereotype.Controller;
 
 import com.google.inject.Injector;
@@ -55,30 +54,30 @@ public class AgentCurrentDataController {
     alertDefinitionsHolder = injector.getInstance(AlertDefinitionsHolder.class);
   }
 
-  @SubscribeMapping("/topologies")
+  @MessageMapping("/topologies")
   public TopologyUpdateEvent getCurrentTopology(Hash hash) throws AmbariException, InvalidStateTransitionException {
     return topologyHolder.getUpdateIfChanged(hash.getHash());
   }
 
-  @SubscribeMapping("/metadata")
+  @MessageMapping("/metadata")
   public MetadataUpdateEvent getCurrentMetadata(Hash hash) throws AmbariException {
     return metadataHolder.getUpdateIfChanged(hash.getHash());
   }
 
-  @SubscribeMapping("/alert_definitions")
+  @MessageMapping("/alert_definitions")
   public AlertDefinitionsUpdateEvent getAlertDefinitions(@Header String simpSessionId, Hash hash) throws AmbariException {
-    String hostName = agentSessionManager.getHost(simpSessionId).getHostName();
-    return alertDefinitionsHolder.getUpdateIfChanged(hash.getHash(), hostName);
+    Long hostId = agentSessionManager.getHost(simpSessionId).getHostId();
+    return alertDefinitionsHolder.getUpdateIfChanged(hash.getHash(), hostId);
   }
 
-  @SubscribeMapping("/configs")
+  @MessageMapping("/configs")
   public AgentConfigsUpdateEvent getCurrentConfigs(@Header String simpSessionId, Hash hash) throws AmbariException {
-    return agentConfigsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName());
+    return agentConfigsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostId());
   }
 
-  @SubscribeMapping("/host_level_params")
+  @MessageMapping("/host_level_params")
   public HostLevelParamsUpdateEvent getCurrentHostLevelParams(@Header String simpSessionId, Hash hash) throws AmbariException {
-    return hostLevelParamsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName());
+    return hostLevelParamsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostId());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
index 746b755..7c540f9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
@@ -38,24 +38,24 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
   @Inject
   private StateUpdateEventPublisher stateUpdateEventPublisher;
 
-  private final Map<String, T> data = new ConcurrentHashMap<>();
+  private final Map<Long, T> data = new ConcurrentHashMap<>();
 
-  protected abstract T getCurrentData(String hostName) throws AmbariException;
+  protected abstract T getCurrentData(Long hostId) throws AmbariException;
   protected abstract boolean handleUpdate(T update) throws AmbariException;
 
-  public T getUpdateIfChanged(String agentHash, String hostName) throws AmbariException {
-    T hostData = initializeDataIfNeeded(hostName, true);
+  public T getUpdateIfChanged(String agentHash, Long hostId) throws AmbariException {
+    T hostData = initializeDataIfNeeded(hostId, true);
     return !Objects.equals(agentHash, hostData.getHash()) ? hostData : getEmptyData();
   }
 
-  private T initializeDataIfNeeded(String hostName, boolean regenerateHash) throws AmbariException {
-    T hostData = data.get(hostName);
+  private T initializeDataIfNeeded(Long hostId, boolean regenerateHash) throws AmbariException {
+    T hostData = data.get(hostId);
     if (hostData == null) {
-      hostData = getCurrentData(hostName);
+      hostData = getCurrentData(hostId);
       if (regenerateHash) {
         regenerateHash(hostData);
       }
-      data.put(hostName, hostData);
+      data.put(hostId, hostData);
     }
     return hostData;
   }
@@ -65,9 +65,9 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
    * event to listeners.
    */
   public final void updateData(T update) throws AmbariException {
-    initializeDataIfNeeded(update.getHostName(), false);
+    initializeDataIfNeeded(update.getHostId(), false);
     if (handleUpdate(update)) {
-      T hostData = getData(update.getHostName());
+      T hostData = getData(update.getHostId());
       regenerateHash(hostData);
       update.setHash(hostData.getHash());
       stateUpdateEventPublisher.publish(update);
@@ -77,28 +77,28 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
   /**
    * Reset data for the given host.  Used if changes are complex and it's easier to re-create data from scratch.
    */
-  public final void resetData(String hostName) throws AmbariException {
-    T newData = getCurrentData(hostName);
-    data.replace(hostName, newData);
+  public final void resetData(Long hostId) throws AmbariException {
+    T newData = getCurrentData(hostId);
+    data.replace(hostId, newData);
     stateUpdateEventPublisher.publish(newData);
   }
 
   /**
    * Remove data for the given host.
    */
-  public final void onHostRemoved(String hostName) {
-    data.remove(hostName);
+  public final void onHostRemoved(String hostId) {
+    data.remove(hostId);
   }
 
-  public Map<String, T> getData() {
+  public Map<Long, T> getData() {
     return data;
   }
 
-  public T getData(String hostName) {
-    return data.get(hostName);
+  public T getData(Long hostId) {
+    return data.get(hostId);
   }
 
-  public void setData(T data, String hostName) {
-    this.data.put(hostName, data);
+  public void setData(T data, Long hostId) {
+    this.data.put(hostId, data);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 5599254..ccfbc75 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import javax.ws.rs.WebApplicationException;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.AgentReport;
+import org.apache.ambari.server.agent.AgentReportsProcessor;
 import org.apache.ambari.server.agent.AgentSessionManager;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ComponentStatus;
@@ -40,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.MessageMapping;
 import org.springframework.messaging.simp.annotation.SendToUser;
-import org.springframework.messaging.simp.annotation.SubscribeMapping;
 import org.springframework.stereotype.Controller;
 
 import com.google.inject.Injector;
@@ -52,13 +53,15 @@ public class AgentReportsController {
   private static final Logger LOG = LoggerFactory.getLogger(AgentReportsController.class);
   private final HeartBeatHandler hh;
   private final AgentSessionManager agentSessionManager;
+  private final AgentReportsProcessor agentReportsProcessor;
 
   public AgentReportsController(Injector injector) {
     hh = injector.getInstance(HeartBeatHandler.class);
     agentSessionManager = injector.getInstance(AgentSessionManager.class);
+    agentReportsProcessor = injector.getInstance(AgentReportsProcessor.class);
   }
 
-  @SubscribeMapping("/component_status")
+  @MessageMapping("/component_status")
   public void handleComponentReportStatus(@Header String simpSessionId, ComponentStatusReports message)
       throws WebApplicationException, InvalidStateTransitionException, AmbariException {
     List<ComponentStatus> statuses = new ArrayList<>();
@@ -73,11 +76,11 @@ public class AgentReportsController {
       }
     }
 
-    hh.handleComponentReportStatus(statuses,
-        agentSessionManager.getHost(simpSessionId).getHostName());
+    agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
+        statuses, null, null));
   }
 
-  @SubscribeMapping("/commands_status")
+  @MessageMapping("/commands_status")
   public void handleCommandReportStatus(@Header String simpSessionId, CommandStatusReports message)
       throws WebApplicationException, InvalidStateTransitionException, AmbariException {
     List<CommandReport> statuses = new ArrayList<>();
@@ -85,16 +88,17 @@ public class AgentReportsController {
       statuses.addAll(clusterReport.getValue());
     }
 
-    hh.handleCommandReportStatus(statuses,
-        agentSessionManager.getHost(simpSessionId).getHostName());
+    agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
+        null, statuses, null));
   }
 
-  @SubscribeMapping("/host_status")
+  @MessageMapping("/host_status")
   public void handleHostReportStatus(@Header String simpSessionId, HostStatusReport message) throws AmbariException {
-    hh.handleHostReportStatus(message, agentSessionManager.getHost(simpSessionId).getHostName());
+    agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
+        null, null, message));
   }
 
-  @SubscribeMapping("/alerts_status")
+  @MessageMapping("/alerts_status")
   public void handleAlertsStatus(@Header String simpSessionId, Alert[] message) throws AmbariException {
     String hostName = agentSessionManager.getHost(simpSessionId).getHostName();
     List<Alert> alerts = Arrays.asList(message);

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java
new file mode 100644
index 0000000..17518ad
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent.stomp;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.server.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Injector;
+
+/**
+ * Simultaneous processing a lot of registering/topology/metadata etc. requests from agents during
+ * agent registration can cause response timeout on agents' side. So it is allowed to process simultaneously requests
+ * only from limited number of agents with session ids from {@link registrationQueue}. Queue has limited capacity,
+ * session id can able be appeared in queue with agent connecting to server and releases with first heartbeat or disconnect from
+ * server.
+ */
+public class AgentsRegistrationQueue {
+  private static final Logger LOG = LoggerFactory.getLogger(AgentsRegistrationQueue.class);
+  private final BlockingQueue<String> registrationQueue;
+  private final ThreadFactory threadFactoryExecutor = new ThreadFactoryBuilder().setNameFormat("agents-queue-%d").build();
+  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactoryExecutor);
+
+  public AgentsRegistrationQueue(Injector injector) {
+    Configuration configuration = injector.getInstance(Configuration.class);
+    registrationQueue = new ArrayBlockingQueue<>(configuration.getAgentsRegistrationQueueSize());
+  }
+
+  public boolean offer(String sessionId) {
+    boolean offered = registrationQueue.offer(sessionId);
+    scheduledExecutorService.schedule(new CompleteJob(sessionId, registrationQueue), 60, TimeUnit.SECONDS);
+    return offered;
+  }
+
+  public void complete(String sessionId) {
+    registrationQueue.remove(sessionId);
+  }
+
+  private class CompleteJob implements Runnable {
+    private String sessionId;
+    private BlockingQueue<String> registrationQueue;
+
+    public CompleteJob(String sessionId, BlockingQueue<String> registrationQueue) {
+      this.sessionId = sessionId;
+      this.registrationQueue = registrationQueue;
+    }
+
+    @Override
+    public void run() {
+      registrationQueue.remove(sessionId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
index 6c6bdd4..9c3f9b5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
@@ -67,9 +67,10 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
   }
 
   @Override
-  protected AlertDefinitionsUpdateEvent getCurrentData(String hostName) throws AmbariException {
+  protected AlertDefinitionsUpdateEvent getCurrentData(Long hostId) throws AmbariException {
     Map<Long, AlertCluster> result = new TreeMap<>();
-    Map<Long, Map<Long, AlertDefinition>> alertDefinitions = helper.get().getAlertDefinitions(hostName);
+    Map<Long, Map<Long, AlertDefinition>> alertDefinitions = helper.get().getAlertDefinitions(hostId);
+    String hostName = clusters.get().getHostById(hostId).getHostName();
     long count = 0;
     for (Map.Entry<Long, Map<Long, AlertDefinition>> e : alertDefinitions.entrySet()) {
       Long clusterId = e.getKey();
@@ -78,7 +79,7 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
       count += definitionMap.size();
     }
     LOG.info("Loaded {} alert definitions for {} clusters for host {}", count, result.size(), hostName);
-    return new AlertDefinitionsUpdateEvent(CREATE, result, hostName);
+    return new AlertDefinitionsUpdateEvent(CREATE, result, hostName, hostId);
   }
 
   @Override
@@ -93,9 +94,9 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
       return false;
     }
 
-    String hostName = update.getHostName();
+    Long hostId = update.getHostId();
     boolean changed = false;
-    Map<Long, AlertCluster> existingClusters = getData(hostName).getClusters();
+    Map<Long, AlertCluster> existingClusters = getData(hostId).getClusters();
 
     switch (update.getEventType()) {
       case UPDATE:
@@ -106,7 +107,7 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
         for (Map.Entry<Long, AlertCluster> e : updateClusters.entrySet()) {
           changed |= existingClusters.get(e.getKey()).handleUpdate(update.getEventType(), e.getValue());
         }
-        LOG.debug("Handled {} of alerts for {} cluster(s) on host {}, changed = {}", update.getEventType(), updateClusters.size(), hostName, changed);
+        LOG.debug("Handled {} of alerts for {} cluster(s) on host with id {}, changed = {}", update.getEventType(), updateClusters.size(), hostId, changed);
         break;
       case CREATE:
         if (!updateClusters.isEmpty()) {
@@ -127,25 +128,26 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
   }
 
   @Subscribe
-  public void onAlertDefinitionRegistered(AlertDefinitionRegistrationEvent event) {
+  public void onAlertDefinitionRegistered(AlertDefinitionRegistrationEvent event) throws AmbariException {
     handleSingleDefinitionChange(UPDATE, event.getDefinition());
   }
 
   @Subscribe
-  public void onAlertDefinitionChanged(AlertDefinitionChangedEvent event) {
+  public void onAlertDefinitionChanged(AlertDefinitionChangedEvent event) throws AmbariException {
     handleSingleDefinitionChange(UPDATE, event.getDefinition());
   }
 
   @Subscribe
-  public void onAlertDefinitionDeleted(AlertDefinitionDeleteEvent event) {
+  public void onAlertDefinitionDeleted(AlertDefinitionDeleteEvent event) throws AmbariException {
     handleSingleDefinitionChange(DELETE, event.getDefinition());
   }
 
   @Subscribe
-  public void onServiceComponentInstalled(ServiceComponentInstalledEvent event) {
+  public void onServiceComponentInstalled(ServiceComponentInstalledEvent event) throws AmbariException {
     String hostName = event.getHostName();
     String serviceName = event.getServiceName();
     String componentName = event.getComponentName();
+    Long hostId = clusters.get().getHost(hostName).getHostId();
 
     Map<Long, AlertDefinition> definitions = helper.get().findByServiceComponent(event.getClusterId(), serviceName, componentName);
 
@@ -162,18 +164,19 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
     }
 
     Map<Long, AlertCluster> map = Collections.singletonMap(event.getClusterId(), new AlertCluster(definitions, hostName));
-    safelyUpdateData(new AlertDefinitionsUpdateEvent(UPDATE, map, hostName));
+    safelyUpdateData(new AlertDefinitionsUpdateEvent(UPDATE, map, hostName, hostId));
   }
 
   @Subscribe
-  public void onServiceComponentUninstalled(ServiceComponentUninstalledEvent event) {
+  public void onServiceComponentUninstalled(ServiceComponentUninstalledEvent event) throws AmbariException {
     String hostName = event.getHostName();
+    Long hostId = clusters.get().getHost(hostName).getHostId();
     Map<Long, AlertDefinition> definitions = helper.get().findByServiceComponent(event.getClusterId(), event.getServiceName(), event.getComponentName());
     if (event.isMasterComponent()) {
       definitions.putAll(helper.get().findByServiceMaster(event.getClusterId(), event.getServiceName()));
     }
     Map<Long, AlertCluster> map = Collections.singletonMap(event.getClusterId(), new AlertCluster(definitions, hostName));
-    safelyUpdateData(new AlertDefinitionsUpdateEvent(DELETE, map, hostName));
+    safelyUpdateData(new AlertDefinitionsUpdateEvent(DELETE, map, hostName, hostId));
   }
 
   @Subscribe
@@ -191,20 +194,21 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
     }
   }
 
-  private void safelyResetData(String hostName) {
+  private void safelyResetData(Long hostId) {
     try {
-      resetData(hostName);
+      resetData(hostId);
     } catch (AmbariException e) {
-      LOG.warn(String.format("Failed to reset alert definitions for host %s", hostName), e);
+      LOG.warn(String.format("Failed to reset alert definitions for host with id %s", hostId), e);
     }
   }
 
-  private void handleSingleDefinitionChange(AlertDefinitionsUpdateEvent.EventType eventType, AlertDefinition alertDefinition) {
+  private void handleSingleDefinitionChange(AlertDefinitionsUpdateEvent.EventType eventType, AlertDefinition alertDefinition) throws AmbariException {
     LOG.info("{} alert definition '{}'", eventType, alertDefinition);
     Set<String> hosts = helper.get().invalidateHosts(alertDefinition);
     for (String hostName : hosts) {
+      Long hostId = clusters.get().getHost(hostName).getHostId();
       Map<Long, AlertCluster> update = Collections.singletonMap(alertDefinition.getClusterId(), new AlertCluster(alertDefinition, hostName));
-      AlertDefinitionsUpdateEvent event = new AlertDefinitionsUpdateEvent(eventType, update, hostName);
+      AlertDefinitionsUpdateEvent event = new AlertDefinitionsUpdateEvent(eventType, update, hostName, hostId);
       safelyUpdateData(event);
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
new file mode 100644
index 0000000..aaab7bf
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.expression.AccessException;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
+import org.springframework.expression.PropertyAccessor;
+import org.springframework.expression.TypedValue;
+import org.springframework.expression.spel.SpelEvaluationException;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.broker.AbstractSubscriptionRegistry;
+import org.springframework.messaging.simp.broker.SubscriptionRegistry;
+import org.springframework.messaging.support.MessageHeaderAccessor;
+import org.springframework.util.AntPathMatcher;
+import org.springframework.util.Assert;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.PathMatcher;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Implementation of {@link SubscriptionRegistry} that has configurable cache size, optimized working with cache and
+ * destinations matching.
+ */
+public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
+  private static final Logger LOG = LoggerFactory.getLogger(AmbariSubscriptionRegistry.class);
+
+  private PathMatcher pathMatcher = new AntPathMatcher();
+
+  private volatile int cacheLimit;
+
+  private String selectorHeaderName = "selector";
+
+  private volatile boolean selectorHeaderInUse = false;
+
+  private final ExpressionParser expressionParser = new SpelExpressionParser();
+
+  private final DestinationCache destinationCache;
+
+  private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry();
+
+  public AmbariSubscriptionRegistry(int cacheLimit) {
+    this.cacheLimit = cacheLimit;
+    destinationCache = new DestinationCache();
+  }
+
+  /**
+   * Specify the {@link PathMatcher} to use.
+   */
+  public void setPathMatcher(PathMatcher pathMatcher) {
+    this.pathMatcher = pathMatcher;
+  }
+
+  /**
+   * Return the configured {@link PathMatcher}.
+   */
+  public PathMatcher getPathMatcher() {
+    return this.pathMatcher;
+  }
+
+  /**
+   * Specify the maximum number of entries for the resolved destination cache.
+   * Default is 1024.
+   */
+  public void setCacheLimit(int cacheLimit) {
+    this.cacheLimit = cacheLimit;
+  }
+
+  /**
+   * Return the maximum number of entries for the resolved destination cache.
+   */
+  public int getCacheLimit() {
+    return this.cacheLimit;
+  }
+
+  /**
+   * Configure the name of a selector header that a subscription message can
+   * have in order to filter messages based on their headers. The value of the
+   * header can use Spring EL expressions against message headers.
+   * <p>For example the following expression expects a header called "foo" to
+   * have the value "bar":
+   * <pre>
+   * headers.foo == 'bar'
+   * </pre>
+   * <p>By default this is set to "selector".
+   * @since 4.2
+   */
+  public void setSelectorHeaderName(String selectorHeaderName) {
+    Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
+    this.selectorHeaderName = selectorHeaderName;
+  }
+
+  /**
+   * Return the name for the selector header.
+   * @since 4.2
+   */
+  public String getSelectorHeaderName() {
+    return this.selectorHeaderName;
+  }
+
+
+  @Override
+  protected void addSubscriptionInternal(
+      String sessionId, String subsId, String destination, Message<?> message) {
+
+    Expression expression = null;
+    MessageHeaders headers = message.getHeaders();
+    String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
+    if (selector != null) {
+      try {
+        expression = this.expressionParser.parseExpression(selector);
+        this.selectorHeaderInUse = true;
+        if (logger.isTraceEnabled()) {
+          logger.trace("Subscription selector: [" + selector + "]");
+        }
+      }
+      catch (Throwable ex) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Failed to parse selector: " + selector, ex);
+        }
+      }
+    }
+    this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
+    this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
+  }
+
+  @Override
+  protected void removeSubscriptionInternal(String sessionId, String subsId, Message<?> message) {
+    SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
+    if (info != null) {
+      String destination = info.removeSubscription(subsId);
+      if (destination != null) {
+        this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId);
+      }
+    }
+  }
+
+  @Override
+  public void unregisterAllSubscriptions(String sessionId) {
+    SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId);
+    if (info != null) {
+      this.destinationCache.updateAfterRemovedSession(info);
+    }
+  }
+
+  @Override
+  protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
+    MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination, message);
+    return filterSubscriptions(result, message);
+  }
+
+  private MultiValueMap<String, String> filterSubscriptions(
+      MultiValueMap<String, String> allMatches, Message<?> message) {
+
+    if (!this.selectorHeaderInUse) {
+      return allMatches;
+    }
+    EvaluationContext context = null;
+    MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size());
+    for (String sessionId : allMatches.keySet()) {
+      for (String subId : allMatches.get(sessionId)) {
+        SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
+        if (info == null) {
+          continue;
+        }
+        Subscription sub = info.getSubscription(subId);
+        if (sub == null) {
+          continue;
+        }
+        Expression expression = sub.getSelectorExpression();
+        if (expression == null) {
+          result.add(sessionId, subId);
+          continue;
+        }
+        if (context == null) {
+          context = new StandardEvaluationContext(message);
+          context.getPropertyAccessors().add(new SimpMessageHeaderPropertyAccessor());
+        }
+        try {
+          if (expression.getValue(context, boolean.class)) {
+            result.add(sessionId, subId);
+          }
+        }
+        catch (SpelEvaluationException ex) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Failed to evaluate selector: " + ex.getMessage());
+          }
+        }
+        catch (Throwable ex) {
+          logger.debug("Failed to evaluate selector", ex);
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]";
+  }
+
+
+  /**
+   * A cache for destinations previously resolved via
+   * {@link org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)}
+   */
+  private class DestinationCache {
+
+    /** Map from destination -> <sessionId, subscriptionId> for fast look-ups */
+    private final Map<String, LinkedMultiValueMap<String, String>> accessCache =
+        new ConcurrentHashMap<>(cacheLimit);
+
+    private final Cache<String, String> notSubscriptionCache =
+        CacheBuilder.newBuilder().maximumSize(cacheLimit).build();
+
+    public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
+      if (notSubscriptionCache.asMap().keySet().contains(destination)) {
+        return new LinkedMultiValueMap<>();
+      }
+      LinkedMultiValueMap<String, String> subscriptions = this.accessCache.computeIfAbsent(destination, (key) -> {
+        LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>();
+        for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
+          for (String destinationPattern : info.getDestinations()) {
+            //TODO temporary changed to more fast-acting check without regex, need move investigation
+            if (destinationPattern.equals(destination)) {
+              for (Subscription subscription : info.getSubscriptions(destinationPattern)) {
+                result.add(info.sessionId, subscription.getId());
+              }
+            }
+          }
+        }
+        if (!result.isEmpty()) {
+          return result;
+        } else {
+          notSubscriptionCache.put(destination, "");
+          return null;
+        }
+      });
+      return subscriptions == null ? new LinkedMultiValueMap<>() : subscriptions;
+    }
+
+    public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
+      this.accessCache.computeIfPresent(destination, (key, value) -> {
+        if (getPathMatcher().match(destination, key)) {
+          LinkedMultiValueMap<String, String> subs = value.deepCopy();
+          subs.add(sessionId, subsId);
+          return subs;
+        }
+        return value;
+      });
+    }
+
+    public void updateAfterRemovedSubscription(String sessionId, String subsId) {
+      for (Iterator<Map.Entry<String, LinkedMultiValueMap<String, String>>> iterator =
+           this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
+        Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
+        String destination = entry.getKey();
+        LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
+        List<String> subscriptions = sessionMap.get(sessionId);
+        if (subscriptions != null) {
+          subscriptions.remove(subsId);
+          if (subscriptions.isEmpty()) {
+            sessionMap.remove(sessionId);
+          }
+          if (sessionMap.isEmpty()) {
+            iterator.remove();
+          }
+          else {
+            this.accessCache.put(destination, sessionMap.deepCopy());
+          }
+        }
+      }
+    }
+
+    public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
+      for (Iterator<Map.Entry<String, LinkedMultiValueMap<String, String>>> iterator =
+           this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
+        Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
+        String destination = entry.getKey();
+        LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
+        if (sessionMap.remove(info.getSessionId()) != null) {
+          if (sessionMap.isEmpty()) {
+            iterator.remove();
+          }
+          else {
+            this.accessCache.put(destination, sessionMap.deepCopy());
+          }
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "cache[" + this.accessCache.size() + " destination(s)]";
+    }
+  }
+
+
+  /**
+   * Provide access to session subscriptions by sessionId.
+   */
+  private static class SessionSubscriptionRegistry {
+
+    // sessionId -> SessionSubscriptionInfo
+    private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
+        new ConcurrentHashMap<String, SessionSubscriptionInfo>();
+
+    public SessionSubscriptionInfo getSubscriptions(String sessionId) {
+      return this.sessions.get(sessionId);
+    }
+
+    public Collection<SessionSubscriptionInfo> getAllSubscriptions() {
+      return this.sessions.values();
+    }
+
+    public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId,
+                                                                                                                         String destination, Expression selectorExpression) {
+
+      SessionSubscriptionInfo info = this.sessions.get(sessionId);
+      if (info == null) {
+        info = new SessionSubscriptionInfo(sessionId);
+        SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info);
+        if (value != null) {
+          info = value;
+        }
+      }
+      info.addSubscription(destination, subscriptionId, selectorExpression);
+      return info;
+    }
+
+    public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
+      return this.sessions.remove(sessionId);
+    }
+
+    @Override
+    public String toString() {
+      return "registry[" + this.sessions.size() + " sessions]";
+    }
+  }
+
+
+  /**
+   * Hold subscriptions for a session.
+   */
+  private static class SessionSubscriptionInfo {
+
+    private final String sessionId;
+
+    // destination -> subscriptions
+    private final Map<String, Set<Subscription>> destinationLookup =
+        new ConcurrentHashMap<String, Set<Subscription>>(4);
+
+    public SessionSubscriptionInfo(String sessionId) {
+      Assert.notNull(sessionId, "'sessionId' must not be null");
+      this.sessionId = sessionId;
+    }
+
+    public String getSessionId() {
+      return this.sessionId;
+    }
+
+    public Set<String> getDestinations() {
+      return this.destinationLookup.keySet();
+    }
+
+    public Set<Subscription> getSubscriptions(String destination) {
+      return this.destinationLookup.get(destination);
+    }
+
+    public Subscription getSubscription(String subscriptionId) {
+      for (String destination : this.destinationLookup.keySet()) {
+        Set<Subscription> subs = this.destinationLookup.get(destination);
+        if (subs != null) {
+          for (Subscription sub : subs) {
+            if (sub.getId().equalsIgnoreCase(subscriptionId)) {
+              return sub;
+            }
+          }
+        }
+      }
+      return null;
+    }
+
+    public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) {
+      Set<Subscription> subs = this.destinationLookup.get(destination);
+      if (subs == null) {
+        synchronized (this.destinationLookup) {
+          subs = this.destinationLookup.get(destination);
+          if (subs == null) {
+            subs = new CopyOnWriteArraySet<Subscription>();
+            this.destinationLookup.put(destination, subs);
+          }
+        }
+      }
+      subs.add(new Subscription(subscriptionId, selectorExpression));
+    }
+
+    public String removeSubscription(String subscriptionId) {
+      for (String destination : this.destinationLookup.keySet()) {
+        Set<Subscription> subs = this.destinationLookup.get(destination);
+        if (subs != null) {
+          for (Subscription sub : subs) {
+            if (sub.getId().equals(subscriptionId) && subs.remove(sub)) {
+              synchronized (this.destinationLookup) {
+                if (subs.isEmpty()) {
+                  this.destinationLookup.remove(destination);
+                }
+              }
+              return destination;
+            }
+          }
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public String toString() {
+      return "[sessionId=" + this.sessionId + ", subscriptions=" + this.destinationLookup + "]";
+    }
+  }
+
+
+  private static final class Subscription {
+
+    private final String id;
+
+    private final Expression selectorExpression;
+
+    public Subscription(String id, Expression selector) {
+      Assert.notNull(id, "Subscription id must not be null");
+      this.id = id;
+      this.selectorExpression = selector;
+    }
+
+    public String getId() {
+      return this.id;
+    }
+
+    public Expression getSelectorExpression() {
+      return this.selectorExpression;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return (this == other || (other instanceof Subscription && this.id.equals(((Subscription) other).id)));
+    }
+
+    @Override
+    public int hashCode() {
+      return this.id.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "subscription(id=" + this.id + ")";
+    }
+  }
+
+
+  private static class SimpMessageHeaderPropertyAccessor implements PropertyAccessor {
+
+    @Override
+    public Class<?>[] getSpecificTargetClasses() {
+      return new Class<?>[] {MessageHeaders.class};
+    }
+
+    @Override
+    public boolean canRead(EvaluationContext context, Object target, String name) {
+      return true;
+    }
+
+    @Override
+    public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException {
+      MessageHeaders headers = (MessageHeaders) target;
+      SimpMessageHeaderAccessor accessor =
+          MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
+      Object value;
+      if ("destination".equalsIgnoreCase(name)) {
+        value = accessor.getDestination();
+      }
+      else {
+        value = accessor.getFirstNativeHeader(name);
+        if (value == null) {
+          value = headers.get(name);
+        }
+      }
+      return new TypedValue(value);
+    }
+
+    @Override
+    public boolean canWrite(EvaluationContext context, Object target, String name) {
+      return false;
+    }
+
+    @Override
+    public void write(EvaluationContext context, Object target, String name, Object value) {
+    }
+  }
+
+}


Mime
View raw message