nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [11/18] nifi git commit: NIFI-1563: - Federate requests and merge responses from nodes instead of storing bulletins and stats at NCM - Updating UI to support restructured status history DTO. - Return 'Insufficient History' message if aggregate stats don'
Date Mon, 04 Apr 2016 16:28:47 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index c0f4c63..303e98e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -29,8 +29,6 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -73,7 +71,6 @@ import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextImpl;
@@ -88,6 +85,7 @@ import org.apache.nifi.cluster.manager.HttpClusterManager;
 import org.apache.nifi.cluster.manager.HttpRequestReplicator;
 import org.apache.nifi.cluster.manager.HttpResponseMapper;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.StatusMerger;
 import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException;
 import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
@@ -109,7 +107,6 @@ import org.apache.nifi.cluster.node.Node.Status;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
@@ -121,7 +118,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@@ -130,7 +126,6 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.StandardFlowSerializer;
@@ -153,16 +148,14 @@ import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.SortedStateUtils;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
+import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
 import org.apache.nifi.controller.status.history.StatusHistoryUtil;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
-import org.apache.nifi.diagnostics.GarbageCollection;
-import org.apache.nifi.diagnostics.StorageUsage;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
@@ -178,7 +171,6 @@ import org.apache.nifi.logging.NiFiLog;
 import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.remote.RemoteResourceManager;
@@ -188,7 +180,9 @@ import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.ReportingTask;
@@ -202,14 +196,18 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.BulletinBoardDTO;
+import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.CountersDTO;
 import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.ListingRequestDTO;
-import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.QueueSizeDTO;
@@ -219,30 +217,53 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.dto.StateEntryDTO;
 import org.apache.nifi.web.api.dto.StateMapDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.BulletinBoardEntity;
 import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ControllerStatusEntity;
+import org.apache.nifi.web.api.entity.CountersEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
 import org.apache.nifi.web.api.entity.ProvenanceEntity;
 import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+import org.apache.nifi.web.api.entity.StatusHistoryEntity;
+import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
 import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -310,10 +331,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
      */
     private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
 
-    public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
 
     public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
     public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+    public static final Pattern PROCESSOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status");
     public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state");
     public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
 
@@ -321,6 +342,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
 
     public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
+    public static final Pattern GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status");
+    public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/status");
     public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
     public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
 
@@ -328,7 +351,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
     public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
 
-    public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
     public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
     public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
     public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state");
@@ -336,6 +358,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
     public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
     public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state");
+    public static final Pattern BULLETIN_BOARD_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletin-board");
+    public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = Pattern.compile("/nifi-api/system-diagnostics");
+    public static final Pattern COUNTERS_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters");
+    public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
+
+    public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN =
+        Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history");
+    public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history");
+    public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern
+        .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status/history");
+    public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern
+        .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status/history");
+
+    public static final Pattern CONNECTION_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status");
+    public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/input-ports/[a-f0-9\\-]{36}/status");
+    public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/output-ports/[a-f0-9\\-]{36}/status");
+    public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN =
+        Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status");
 
     @Deprecated
     public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
@@ -378,7 +418,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     private final BulletinRepository bulletinRepository;
     private final String instanceId;
     private final FlowEngine reportingTaskEngine;
-    private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>();
     private final StandardProcessScheduler processScheduler;
     private final StateManagerProvider stateManagerProvider;
     private final long componentStatusSnapshotMillis;
@@ -451,11 +490,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             throw new RuntimeException(e);
         }
 
-        processScheduler = new StandardProcessScheduler(new Heartbeater() {
-            @Override
-            public void heartbeat() {
-            }
-        }, this, encryptor, stateManagerProvider);
+        processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider);
 
         // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only
         // going to be scheduling Reporting Tasks. Otherwise, it would not be okay.
@@ -463,7 +498,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
-        processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), "Capture Component Metrics", componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS);
 
         controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
     }
@@ -620,7 +654,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return MessageType.CONNECTION_REQUEST == msg.getType()
                 || MessageType.HEARTBEAT == msg.getType()
                 || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
-                || MessageType.BULLETINS == msg.getType()
                 || MessageType.RECONNECTION_FAILURE == msg.getType();
     }
 
@@ -654,10 +687,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     }
                 }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
                 return null;
-            case BULLETINS:
-                final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage;
-                handleBulletins(bulletinsMessage.getBulletins());
-                return null;
             default:
                 throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType());
         }
@@ -1686,22 +1715,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         processScheduler.enableReportingTask(reportingTask);
     }
 
-    /**
-     * Handle a bulletins message.
-     *
-     * @param bulletins bulletins
-     */
-    public void handleBulletins(final NodeBulletins bulletins) {
-        final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
-        final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
-
-        // unmarshal the message
-        final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
-        for (final Bulletin bulletin : payload.getBulletins()) {
-            bulletin.setNodeAddress(nodeAddress);
-            bulletinRepository.addBulletin(bulletin);
-        }
-    }
 
     /**
      * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected
@@ -1875,20 +1888,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
 
-    private ComponentStatusRepository createComponentStatusRepository() {
-        final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
-        }
-
-        try {
-            return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @Override
     public Set<Node> getNodes(final Status... statuses) {
         final Set<Status> desiredStatusSet = new HashSet<>();
@@ -2434,6 +2433,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
+    private static boolean isProcessorStatusEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isProcessorStateEndpoint(final URI uri, final String method) {
         return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
     }
@@ -2442,6 +2445,30 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isConnectionStatusEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isInputPortStatusEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isOutputPortStatusEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isRemoteProcessGroupStatusEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isGroupStatusEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isControllerStatusEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isTemplateEndpoint(final URI uri, final String method) {
         return "POST".equalsIgnoreCase(method) && TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches();
     }
@@ -2454,6 +2481,35 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isProcessorStatusHistoryEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isProcessGroupStatusHistoryEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isRemoteProcessGroupStatusHistoryEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isConnectionStatusHistoryEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isBulletinBoardEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isSystemDiagnosticsEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isCountersEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+
     private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) {
         if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
             return true;
@@ -2487,8 +2543,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
-    private static boolean isCountersEndpoint(final URI uri) {
-        return COUNTERS_URI.matcher(uri.getPath()).matches();
+    private static boolean isCounterEndpoint(final URI uri) {
+        return COUNTER_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
     private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
@@ -2556,7 +2612,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method)
                 || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method)
                 || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method)
-                || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method);
+                || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method)
+                || isGroupStatusEndpoint(uri, method) || isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method)
+                || isConnectionStatusEndpoint(uri, method) || isRemoteProcessGroupStatusEndpoint(uri, method)
+                || isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method)
+                || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method)
+                || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method)
+                || isBulletinBoardEndpoint(uri, method) || isSystemDiagnosticsEndpoint(uri, method)
+                || isCountersEndpoint(uri, method);
     }
 
     private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2608,6 +2671,303 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         componentState.getLocalState().setState(localStateEntries);
     }
 
+
+    private void mergeSystemDiagnostics(final SystemDiagnosticsDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, SystemDiagnosticsDTO> resultMap) {
+        final SystemDiagnosticsDTO mergedSystemDiagnostics = target;
+        mergedSystemDiagnostics.setNodeSnapshots(new ArrayList<NodeSystemDiagnosticsSnapshotDTO>());
+
+        final NodeSystemDiagnosticsSnapshotDTO selectedNodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO();
+        selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedSystemDiagnostics.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        for (final Map.Entry<NodeIdentifier, SystemDiagnosticsDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final SystemDiagnosticsDTO toMerge = entry.getValue();
+            if (toMerge == target) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedSystemDiagnostics, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+    private void mergeCounters(final CountersDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, CountersDTO> resultMap) {
+        final CountersDTO mergedCounters = target;
+        mergedCounters.setNodeSnapshots(new ArrayList<NodeCountersSnapshotDTO>());
+
+        final NodeCountersSnapshotDTO selectedNodeSnapshot = new NodeCountersSnapshotDTO();
+        selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedCounters.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        for (final Map.Entry<NodeIdentifier, CountersDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final CountersDTO toMerge = entry.getValue();
+            if (toMerge == target) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedCounters, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+    private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) {
+        final ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto;
+        mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeProcessGroupStatusSnapshotDTO>());
+
+        final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessGroupStatusSnapshotDTO();
+        selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue();
+            if (nodeProcessGroupStatus == mergedProcessGroupStatus) {
+                continue;
+            }
+
+            final ProcessGroupStatusSnapshotDTO nodeSnapshot = nodeProcessGroupStatus.getAggregateSnapshot();
+            for (final RemoteProcessGroupStatusSnapshotDTO remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) {
+                final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
+                if (!nodeAuthorizationIssues.isEmpty()) {
+                    for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
+                        final String Issue = iter.next();
+                        iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
+                    }
+                    remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
+                }
+            }
+
+            StatusMerger.merge(mergedProcessGroupStatus, nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+
+    private void mergeProcessorStatus(final ProcessorStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessorStatusDTO> resultMap) {
+        final ProcessorStatusDTO mergedProcessorStatus = statusDto;
+        mergedProcessorStatus.setNodeSnapshots(new ArrayList<NodeProcessorStatusSnapshotDTO>());
+
+        final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessorStatusSnapshotDTO();
+        selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessorStatusDTO nodeProcessorStatus = entry.getValue();
+            if (nodeProcessorStatus == statusDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+    private void mergeConnectionStatus(final ConnectionStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ConnectionStatusDTO> resultMap) {
+        final ConnectionStatusDTO mergedConnectionStatus = statusDto;
+        mergedConnectionStatus.setNodeSnapshots(new ArrayList<NodeConnectionStatusSnapshotDTO>());
+
+        final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new NodeConnectionStatusSnapshotDTO();
+        selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ConnectionStatusDTO nodeConnectionStatus = entry.getValue();
+            if (nodeConnectionStatus == statusDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+    private void mergePortStatus(final PortStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, PortStatusDTO> resultMap) {
+        final PortStatusDTO mergedPortStatus = statusDto;
+        mergedPortStatus.setNodeSnapshots(new ArrayList<NodePortStatusSnapshotDTO>());
+
+        final NodePortStatusSnapshotDTO selectedNodeSnapshot = new NodePortStatusSnapshotDTO();
+        selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedPortStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final PortStatusDTO nodePortStatus = entry.getValue();
+            if (nodePortStatus == statusDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedPortStatus, nodePortStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+    private void mergeRemoteProcessGroupStatus(final RemoteProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultMap) {
+        final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = statusDto;
+        mergedRemoteProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>());
+
+        final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO();
+        selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = entry.getValue();
+            if (nodeRemoteProcessGroupStatus == statusDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedRemoteProcessGroupStatus, nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+    private void mergeControllerStatus(final ControllerStatusDTO statusDto, final Map<NodeIdentifier, ControllerStatusDTO> resultMap) {
+        ControllerStatusDTO mergedStatus = statusDto;
+        for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ControllerStatusDTO nodeStatus = entry.getValue();
+
+            final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
+            for (final BulletinDTO bulletin : nodeStatus.getBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+            for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+            for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+
+            if (nodeStatus == mergedStatus) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedStatus, nodeStatus);
+        }
+
+        final int totalNodeCount = getNodeIds().size();
+        final int connectedNodeCount = getNodeIds(Status.CONNECTED).size();
+
+        final List<Bulletin> ncmControllerBulletins = getBulletinRepository().findBulletinsForController();
+        mergedStatus.setBulletins(mergeNCMBulletins(mergedStatus.getBulletins(), ncmControllerBulletins));
+
+        // get the controller service bulletins
+        final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+        final List<Bulletin> ncmServiceBulletins = getBulletinRepository().findBulletins(controllerServiceQuery);
+        mergedStatus.setControllerServiceBulletins(mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(), ncmServiceBulletins));
+
+        // get the reporting task bulletins
+        final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+        final List<Bulletin> ncmReportingTaskBulletins = getBulletinRepository().findBulletins(reportingTaskQuery);
+        mergedStatus.setReportingTaskBulletins(mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(), ncmReportingTaskBulletins));
+
+        mergedStatus.setConnectedNodeCount(connectedNodeCount);
+        mergedStatus.setTotalNodeCount(totalNodeCount);
+        StatusMerger.updatePrettyPrintedFields(mergedStatus);
+    }
+
+    private List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> nodeBulletins, final List<Bulletin> ncmBulletins) {
+        if (ncmBulletins == null || ncmBulletins.isEmpty()) {
+            return nodeBulletins;
+        }
+
+        final List<BulletinDTO> mergedBulletins = new ArrayList<>(nodeBulletins.size() + ncmBulletins.size());
+        mergedBulletins.addAll(nodeBulletins);
+        mergedBulletins.addAll(createBulletinDtos(ncmBulletins));
+        return mergedBulletins;
+    }
+
+    private void mergeBulletinBoard(final BulletinBoardDTO nodeBulletinBoard, final Map<NodeIdentifier, BulletinBoardDTO> resultMap) {
+        final List<BulletinDTO> bulletinDtos = new ArrayList<>();
+        for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final BulletinBoardDTO boardDto = entry.getValue();
+            final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
+
+            for (final BulletinDTO bulletin : boardDto.getBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+                bulletinDtos.add(bulletin);
+            }
+        }
+
+        Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() {
+            @Override
+            public int compare(final BulletinDTO o1, final BulletinDTO o2) {
+                final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
+                if (timeComparison != 0) {
+                    return timeComparison;
+                }
+
+                return o1.getNodeAddress().compareTo(o2.getNodeAddress());
+            }
+        });
+
+        nodeBulletinBoard.setBulletins(bulletinDtos);
+    }
+
+    /**
+     * Creates BulletinDTOs for the specified Bulletins.
+     *
+     * @param bulletins bulletin
+     * @return dto
+     */
+    public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) {
+        final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size());
+        for (final Bulletin bulletin : bulletins) {
+            bulletinDtos.add(createBulletinDto(bulletin));
+        }
+        return bulletinDtos;
+    }
+
+    /**
+     * Creates a BulletinDTO for the specified Bulletin.
+     *
+     * @param bulletin bulletin
+     * @return dto
+     */
+    public BulletinDTO createBulletinDto(final Bulletin bulletin) {
+        final BulletinDTO dto = new BulletinDTO();
+        dto.setId(bulletin.getId());
+        dto.setNodeAddress(bulletin.getNodeAddress());
+        dto.setTimestamp(bulletin.getTimestamp());
+        dto.setGroupId(bulletin.getGroupId());
+        dto.setSourceId(bulletin.getSourceId());
+        dto.setSourceName(bulletin.getSourceName());
+        dto.setCategory(bulletin.getCategory());
+        dto.setLevel(bulletin.getLevel());
+        dto.setMessage(bulletin.getMessage());
+        return dto;
+    }
+
     private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
         final ProvenanceResultsDTO results = provenanceDto.getResults();
         final ProvenanceRequestDTO request = provenanceDto.getRequest();
@@ -3545,6 +3905,252 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             mergeComponentState(componentState, resultsMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isGroupStatusEndpoint(uri, method)) {
+            final ProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class);
+            final ProcessGroupStatusDTO statusRequest = responseEntity.getProcessGroupStatus();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, ProcessGroupStatusDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ProcessGroupStatusEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class);
+                }
+
+                final ProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getProcessGroupStatus();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeGroupStatus(statusRequest, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isProcessorStatusEndpoint(uri, method)) {
+            final ProcessorStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorStatusEntity.class);
+            final ProcessorStatusDTO statusRequest = responseEntity.getProcessorStatus();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, ProcessorStatusDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ProcessorStatusEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessorStatusEntity.class);
+                }
+
+                final ProcessorStatusDTO nodeStatus = nodeResponseEntity.getProcessorStatus();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeProcessorStatus(statusRequest, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isConnectionStatusEndpoint(uri, method)) {
+            final ConnectionStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ConnectionStatusEntity.class);
+            final ConnectionStatusDTO statusRequest = responseEntity.getConnectionStatus();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, ConnectionStatusDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ConnectionStatusEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ConnectionStatusEntity.class);
+                }
+
+                final ConnectionStatusDTO nodeStatus = nodeResponseEntity.getConnectionStatus();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeConnectionStatus(statusRequest, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && (isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method))) {
+            final PortStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(PortStatusEntity.class);
+            final PortStatusDTO statusRequest = responseEntity.getPortStatus();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, PortStatusDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final PortStatusEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = nodeResponse.getClientResponse().getEntity(PortStatusEntity.class);
+                }
+
+                final PortStatusDTO nodeStatus = nodeResponseEntity.getPortStatus();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergePortStatus(statusRequest, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusEndpoint(uri, method)) {
+            final RemoteProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class);
+            final RemoteProcessGroupStatusDTO statusRequest = responseEntity.getRemoteProcessGroupStatus();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final RemoteProcessGroupStatusEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = nodeResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class);
+                }
+
+                final RemoteProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getRemoteProcessGroupStatus();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeRemoteProcessGroupStatus(statusRequest, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isControllerStatusEndpoint(uri, method)) {
+            final ControllerStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerStatusEntity.class);
+            final ControllerStatusDTO statusRequest = responseEntity.getControllerStatus();
+
+            final Map<NodeIdentifier, ControllerStatusDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerStatusEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerStatusEntity.class);
+                final ControllerStatusDTO nodeStatus = nodeResponseEntity.getControllerStatus();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeControllerStatus(statusRequest, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isBulletinBoardEndpoint(uri, method)) {
+            final BulletinBoardEntity responseEntity = clientResponse.getClientResponse().getEntity(BulletinBoardEntity.class);
+            final BulletinBoardDTO responseDto = responseEntity.getBulletinBoard();
+
+            final Map<NodeIdentifier, BulletinBoardDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final BulletinBoardEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(BulletinBoardEntity.class);
+                final BulletinBoardDTO nodeStatus = nodeResponseEntity.getBulletinBoard();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeBulletinBoard(responseDto, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isProcessorStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+            for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+        } else if (hasSuccessfulClientResponse && isConnectionStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+            for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+        } else if (hasSuccessfulClientResponse && isProcessGroupStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+            for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+        } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+            for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+        } else if (hasSuccessfulClientResponse && isSystemDiagnosticsEndpoint(uri, method)) {
+            final SystemDiagnosticsEntity responseEntity = clientResponse.getClientResponse().getEntity(SystemDiagnosticsEntity.class);
+            final SystemDiagnosticsDTO responseDto = responseEntity.getSystemDiagnostics();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, SystemDiagnosticsDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final SystemDiagnosticsEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = nodeResponse.getClientResponse().getEntity(SystemDiagnosticsEntity.class);
+                }
+
+                final SystemDiagnosticsDTO nodeStatus = nodeResponseEntity.getSystemDiagnostics();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeSystemDiagnostics(responseDto, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isCountersEndpoint(uri, method)) {
+            final CountersEntity responseEntity = clientResponse.getClientResponse().getEntity(CountersEntity.class);
+            final CountersDTO responseDto = responseEntity.getCounters();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, CountersDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final CountersEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = nodeResponse.getClientResponse().getEntity(CountersEntity.class);
+                }
+
+                final CountersDTO nodeStatus = nodeResponseEntity.getCounters();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeCounters(responseDto, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);
@@ -3603,6 +4209,49 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return clientResponse;
     }
 
+
+    private NodeResponse mergeStatusHistoryResponses(NodeResponse clientResponse, Map<Node, NodeResponse> updatedNodesMap, Set<NodeResponse> problematicNodeResponses,
+        Map<String, MetricDescriptor<?>> metricDescriptors) {
+        final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+
+        StatusHistoryDTO lastStatusHistory = null;
+        final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(updatedNodesMap.size());
+        for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+            if (problematicNodeResponses.contains(nodeResponse)) {
+                continue;
+            }
+
+            final StatusHistoryEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+            final StatusHistoryDTO nodeStatus = nodeResponseEntity.getStatusHistory();
+            lastStatusHistory = nodeStatus;
+
+            final NodeIdentifier nodeId = nodeResponse.getNodeId();
+            final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO();
+            nodeStatusSnapshot.setNodeId(nodeId.getId());
+            nodeStatusSnapshot.setAddress(nodeId.getApiAddress());
+            nodeStatusSnapshot.setApiPort(nodeId.getApiPort());
+            nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots());
+            nodeStatusSnapshots.add(nodeStatusSnapshot);
+        }
+
+        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
+        clusterStatusHistory.setAggregateSnapshots(mergeStatusHistories(nodeStatusSnapshots, metricDescriptors));
+        clusterStatusHistory.setGenerated(new Date());
+        clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots);
+        if (lastStatusHistory != null) {
+            clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails());
+            clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors());
+        }
+
+        final StatusHistoryEntity clusterEntity = new StatusHistoryEntity();
+        clusterEntity.setStatusHistory(clusterStatusHistory);
+        clusterEntity.setRevision(responseEntity.getRevision());
+
+        return new NodeResponse(clientResponse, clusterEntity);
+    }
+
+
+
     /**
      * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least
      * one node contained the counter in question).
@@ -3612,7 +4261,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
      * @return Whether all problematic node responses were due to a missing counter
      */
     private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) {
-        if (isCountersEndpoint(uri)) {
+        if (isCounterEndpoint(uri)) {
             boolean notFound = true;
             for (final NodeResponse problematicResponse : problematicNodeResponses) {
                 if (problematicResponse.getStatus() != 404) {
@@ -4026,207 +4675,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return bulletinRepository;
     }
 
-    @Override
-    public ProcessGroupStatus getProcessGroupStatus(final String groupId) {
-        final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED);
-
-        // ensure there are some nodes in the cluster
-        if (connectedNodes.isEmpty()) {
-            throw new NoConnectedNodesException();
-        }
-
-        ProcessGroupStatus mergedProcessGroupStatus = null;
-        for (final Node node : connectedNodes) {
-            final NodeIdentifier nodeId = node.getNodeId();
-            final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
-            if (nodeHeartbeatPayload == null) {
-                continue;
-            }
-            final ProcessGroupStatus nodeRootProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus();
-            final ProcessGroupStatus nodeProcessGroupStatus = groupId.equals(ROOT_GROUP_ID_ALIAS) ? nodeRootProcessGroupStatus : getProcessGroupStatus(nodeRootProcessGroupStatus, groupId);
-            if (nodeProcessGroupStatus == null) {
-                continue;
-            }
-
-            if (mergedProcessGroupStatus == null) {
-                mergedProcessGroupStatus = nodeProcessGroupStatus.clone();
-
-                // update any  issues with the node label
-                if (mergedProcessGroupStatus.getRemoteProcessGroupStatus() != null) {
-                    for (final RemoteProcessGroupStatus remoteProcessGroupStatus : mergedProcessGroupStatus.getRemoteProcessGroupStatus()) {
-                        final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
-                        if (!nodeAuthorizationIssues.isEmpty()) {
-                            for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
-                                final String Issue = iter.next();
-                                iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
-                            }
-                            remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
-                        }
-                    }
-                }
-            } else {
-                final ProcessGroupStatus nodeClone = nodeProcessGroupStatus.clone();
-                for (final RemoteProcessGroupStatus remoteProcessGroupStatus : nodeClone.getRemoteProcessGroupStatus()) {
-                    final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
-                    if (!nodeAuthorizationIssues.isEmpty()) {
-                        for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
-                            final String Issue = iter.next();
-                            iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
-                        }
-                        remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
-                    }
-                }
-
-                ProcessGroupStatus.merge(mergedProcessGroupStatus, nodeClone);
-            }
-        }
-
-        return mergedProcessGroupStatus;
-    }
-
-    private ProcessGroupStatus getProcessGroupStatus(final ProcessGroupStatus parent, final String groupId) {
-        if (parent.getId().equals(groupId)) {
-            return parent;
-        }
-
-        for (final ProcessGroupStatus child : parent.getProcessGroupStatus()) {
-            final ProcessGroupStatus matching = getProcessGroupStatus(child, groupId);
-            if (matching != null) {
-                return matching;
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public SystemDiagnostics getSystemDiagnostics() {
-        final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED);
-
-        // ensure there are some nodes...
-        if (connectedNodes.isEmpty()) {
-            throw new NoConnectedNodesException();
-        }
-
-        SystemDiagnostics clusterDiagnostics = null;
-        for (final Node node : connectedNodes) {
-            final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
-            if (nodeHeartbeatPayload == null) {
-                continue;
-            }
-            final SystemDiagnostics nodeDiagnostics = nodeHeartbeatPayload.getSystemDiagnostics();
-            if (nodeDiagnostics == null) {
-                continue;
-            }
-
-            if (clusterDiagnostics == null) {
-                clusterDiagnostics = nodeDiagnostics.clone();
-            } else {
-                merge(clusterDiagnostics, nodeDiagnostics);
-            }
-        }
-
-        return clusterDiagnostics;
-    }
-
-    private void merge(final SystemDiagnostics target, final SystemDiagnostics sd) {
-
-        // threads
-        target.setDaemonThreads(target.getDaemonThreads() + sd.getDaemonThreads());
-        target.setTotalThreads(target.getTotalThreads() + sd.getTotalThreads());
-
-        // heap
-        target.setTotalHeap(target.getTotalHeap() + sd.getTotalHeap());
-        target.setUsedHeap(target.getUsedHeap() + sd.getUsedHeap());
-        target.setMaxHeap(target.getMaxHeap() + sd.getMaxHeap());
-
-        // non heap
-        target.setTotalNonHeap(target.getTotalNonHeap() + sd.getTotalNonHeap());
-        target.setUsedNonHeap(target.getUsedNonHeap() + sd.getUsedNonHeap());
-        target.setMaxNonHeap(target.getMaxNonHeap() + sd.getMaxNonHeap());
-
-        // processors
-        target.setAvailableProcessors(target.getAvailableProcessors() + sd.getAvailableProcessors());
-
-        // load
-        if (sd.getProcessorLoadAverage() != null) {
-            if (target.getProcessorLoadAverage() != null) {
-                target.setProcessorLoadAverage(target.getProcessorLoadAverage() + sd.getProcessorLoadAverage());
-            } else {
-                target.setProcessorLoadAverage(sd.getProcessorLoadAverage());
-            }
-        }
-
-        // db disk usage
-        merge(target.getFlowFileRepositoryStorageUsage(), sd.getFlowFileRepositoryStorageUsage());
-
-        // repo disk usage
-        final Map<String, StorageUsage> targetContentRepoMap;
-        if (target.getContentRepositoryStorageUsage() == null) {
-            targetContentRepoMap = new LinkedHashMap<>();
-            target.setContentRepositoryStorageUsage(targetContentRepoMap);
-        } else {
-            targetContentRepoMap = target.getContentRepositoryStorageUsage();
-        }
-        if (sd.getContentRepositoryStorageUsage() != null) {
-            for (final Map.Entry<String, StorageUsage> sdEntry : sd.getContentRepositoryStorageUsage().entrySet()) {
-                final StorageUsage mergedDiskUsage = targetContentRepoMap.get(sdEntry.getKey());
-                if (mergedDiskUsage == null) {
-                    targetContentRepoMap.put(sdEntry.getKey(), sdEntry.getValue());
-                } else {
-                    merge(mergedDiskUsage, sdEntry.getValue());
-                }
-            }
-        }
-
-        // garbage collection
-        final Map<String, GarbageCollection> targetGarbageCollection;
-        if (target.getGarbageCollection() == null) {
-            targetGarbageCollection = new LinkedHashMap<>();
-            target.setGarbageCollection(targetGarbageCollection);
-        } else {
-            targetGarbageCollection = target.getGarbageCollection();
-        }
-        if (sd.getGarbageCollection() != null) {
-            for (final Map.Entry<String, GarbageCollection> gcEntry : sd.getGarbageCollection().entrySet()) {
-                final GarbageCollection mergedGarbageCollection = targetGarbageCollection.get(gcEntry.getKey());
-                if (mergedGarbageCollection == null) {
-                    targetGarbageCollection.put(gcEntry.getKey(), gcEntry.getValue().clone());
-                } else {
-                    merge(mergedGarbageCollection, gcEntry.getValue());
-                }
-            }
-        }
-    }
-
-    private void merge(final StorageUsage target, final StorageUsage du) {
-        target.setFreeSpace(target.getFreeSpace() + du.getFreeSpace());
-        target.setTotalSpace(target.getTotalSpace() + du.getTotalSpace());
-    }
-
-    private void merge(final GarbageCollection target, final GarbageCollection gc) {
-        target.setCollectionCount(target.getCollectionCount() + gc.getCollectionCount());
-        target.setCollectionTime(target.getCollectionTime() + gc.getCollectionTime());
-    }
 
     public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) {
         final long time = toNormalize.getTime();
         return new Date(time - time % numMillis);
     }
 
-    private NodeDTO createNodeDTO(final Node node) {
-        final NodeDTO nodeDto = new NodeDTO();
-        final NodeIdentifier nodeId = node.getNodeId();
-        nodeDto.setNodeId(nodeId.getId());
-        nodeDto.setAddress(nodeId.getApiAddress());
-        nodeDto.setApiPort(nodeId.getApiPort());
-        nodeDto.setStatus(node.getStatus().name());
-        nodeDto.setPrimary(node.equals(getPrimaryNode()));
-        final Date connectionRequested = new Date(node.getConnectionRequestedTimestamp());
-        nodeDto.setConnectionRequested(connectionRequested);
-
-        return nodeDto;
-    }
 
     private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) {
         // Aggregate the snapshots
@@ -4245,278 +4699,65 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return aggregatedSnapshotDtos;
     }
 
-    public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId) {
-        return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE);
-    }
-
-    public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startDate, final Date endDate, final int preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
-        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+    private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) {
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
+        snapshot.setTimestamp(snapshotDto.getTimestamp());
 
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
+        final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
+        for (final Map.Entry<String, Long> entry : metrics.entrySet()) {
+            final String metricId = entry.getKey();
+            final Long value = entry.getValue();
 
-            final StatusHistory statusHistory = statusRepository.getProcessorStatusHistory(processorId, startDate, endDate, preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
-
-            processorDescriptors.addAll(statusRepository.getProcessorMetricDescriptors());
-
-            // record the status history (last) to get the component details for use later
-            final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
-            lastStatusHistory = statusHistoryDto;
-
-            final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
-                final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
-                }
-                snapshots.add(snapshot);
+            final MetricDescriptor<?> descriptor = metricDescriptors.get(metricId);
+            if (descriptor != null) {
+                snapshot.addStatusMetric(descriptor, value);
             }
         }
 
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
-
-        // get the details for this component from the last status history
-        final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
-        clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
-        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
-        clusterStatusHistory.setGenerated(new Date());
-        clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processorDescriptors));
-        clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
-        clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
-        final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
-        history.setGenerated(new Date());
-        history.setNodeStatusHistory(nodeHistories);
-        history.setClusterStatusHistory(clusterStatusHistory);
-        return history;
+        return snapshot;
     }
 
-    public StatusHistoryDTO createStatusHistoryDto(final StatusHistory statusHistory) {
-        final StatusHistoryDTO dto = new StatusHistoryDTO();
-
-        dto.setDetails(new LinkedHashMap<>(statusHistory.getComponentDetails()));
-        dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(statusHistory));
-        dto.setGenerated(statusHistory.getDateGenerated());
+    private List<StatusSnapshotDTO> mergeStatusHistories(final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots, final Map<String, MetricDescriptor<?>> metricDescriptors) {
+        // We want a Map<Date, List<StatusSnapshot>>, which is a Map of "normalized Date" (i.e., a time range, essentially)
+        // to all Snapshots for that time. The list will contain one snapshot for each node. However, we can have the case
+        // where the NCM has a different value for the componentStatusSnapshotMillis than the nodes have. In this case,
+        // we end up with multiple entries in the List<StatusSnapshot> for the same node/timestamp, which skews our aggregate
+        // results. In order to avoid this, we will use only the latest snapshot for a node that falls into the the time range
+        // of interest.
+        // To accomplish this, we have an intermediate data structure, which is a Map of "normalized Date" to an inner Map
+        // of Node Identifier to StatusSnapshot. We then will flatten this Map and aggregate the results.
+        final Map<Date, Map<String, StatusSnapshot>> dateToNodeSnapshots = new TreeMap<>();
 
-        final List<StatusSnapshotDTO> statusSnapshots = new ArrayList<>();
-        for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) {
-            statusSnapshots.add(StatusHistoryUtil.createStatusSnapshotDto(statusSnapshot));
-        }
-        dto.setStatusSnapshots(statusSnapshots);
-
-        return dto;
-    }
-
-    public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId) {
-        return getConnectionStatusHistory(connectionId, null, null, Integer.MAX_VALUE);
-    }
-
-    public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId, final Date startDate, final Date endDate, final int preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
-        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
-
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
-
-            final StatusHistory statusHistory = statusRepository.getConnectionStatusHistory(connectionId, startDate, endDate, preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
-
-            final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
-            // record the status history (last) to get the componet details for use later
-            lastStatusHistory = statusHistoryDto;
-            connectionDescriptors.addAll(statusRepository.getConnectionMetricDescriptors());
-
-            final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
+        // group status snapshot's for each node by date
+        for (final NodeStatusSnapshotsDTO nodeStatusSnapshot : nodeStatusSnapshots) {
+            for (final StatusSnapshotDTO snapshotDto : nodeStatusSnapshot.getStatusSnapshots()) {
+                final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors);
                 final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
-                }
-                snapshots.add(snapshot);
-            }
-        }
-
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
-
-        // get the details for this component from the last status history
-        final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
-        clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
-        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
-        clusterStatusHistory.setGenerated(new Date());
-        clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(connectionDescriptors));
-        clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
-        clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
-        final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
-        history.setGenerated(new Date());
-        history.setNodeStatusHistory(nodeHistories);
-        history.setClusterStatusHistory(clusterStatusHistory);
-        return history;
-    }
-
-    public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) {
-        return getProcessGroupStatusHistory(processGroupId, null, null, Integer.MAX_VALUE);
-    }
-
-    public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
-        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
-
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
-
-            final StatusHistory statusHistory = statusRepository.getProcessGroupStatusHistory(processGroupId, startDate, endDate, preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
-
-            final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
-            // record the status history (last) to get the componet details for use later
-            lastStatusHistory = statusHistoryDto;
-            processGroupDescriptors.addAll(statusRepository.getProcessGroupMetricDescriptors());
-
-            final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
-                final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
+                Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate);
+                if (nodeToSnapshotMap == null) {
+                    nodeToSnapshotMap = new HashMap<>();
+                    dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap);
                 }
-                snapshots.add(snapshot);
+                nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot);
             }
         }
 
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
-
-        // get the details for this component from the last status history
-        final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
-        clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
-        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
-        clusterStatusHistory.setGenerated(new Date());
-        clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
-        clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processGroupDescriptors));
-        clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
-        final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
-        history.setGenerated(new Date());
-        history.setNodeStatusHistory(nodeHistories);
-        history.setClusterStatusHistory(clusterStatusHistory);
-        return history;
-    }
-
-    public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId) {
-        return getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, Integer.MAX_VALUE);
-    }
-
-    public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
+        // aggregate the snapshots by (normalized) timestamp
         final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
-
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
-
-            final StatusHistory statusHistory = statusRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startDate, endDate, preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
-
-            final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
-            // record the status history (last) to get the componet details for use later
-            lastStatusHistory = statusHistoryDto;
-            remoteProcessGroupDescriptors.addAll(statusRepository.getRemoteProcessGroupMetricDescriptors());
-
-            final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
-                final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
-                }
-                snapshots.add(snapshot);
-            }
+        for (final Map.Entry<Date, Map<String, StatusSnapshot>> entry : dateToNodeSnapshots.entrySet()) {
+            final Date normalizedDate = entry.getKey();
+            final Map<String, StatusSnapshot> nodeToSnapshot = entry.getValue();
+            final List<StatusSnapshot> snapshotsForTimestamp = new ArrayList<>(nodeToSnapshot.values());
+            snapshotsToAggregate.put(normalizedDate, snapshotsForTimestamp);
         }
 
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
+        final List<StatusSnapshotDTO> aggregatedSnapshots = aggregate(snapshotsToAggregate);
+        return aggregatedSnapshots;
+    }
 
-        // get the details for this comp

<TRUNCATED>

Mime
View raw message