nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [nifi] branch master updated: NIFI-6598 Storing peers into managed-state - Fixed checkstyle errors. - Added PeerPersistence interface. - Expose RemoteProcessGroup state via REST API - Made stateManager transient.
Date Tue, 01 Oct 2019 13:56:55 GMT
This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6541eac  NIFI-6598 Storing peers into managed-state - Fixed checkstyle errors. - Added PeerPersistence interface. - Expose RemoteProcessGroup state via REST API - Made stateManager transient.
6541eac is described below

commit 6541eac625ff1d721867152d728c53483eab822a
Author: Koji Kawamura <ijokarumawak@apache.org>
AuthorDate: Tue Aug 27 16:53:02 2019 +0900

    NIFI-6598 Storing peers into managed-state
    - Fixed checkstyle errors.
    - Added PeerPersistence interface.
    - Expose RemoteProcessGroup state via REST API
    - Made stateManager transient.
    
    This closes #3677.
    
    Signed-off-by: Bryan Bende <bbende@apache.org>
---
 .../java/org/apache/nifi/util/NiFiProperties.java  |  12 --
 .../remote/client/AbstractPeerPersistence.java     |  93 +++++++++++++++
 .../nifi/remote/client/FilePeerPersistence.java    |  55 +++++++++
 .../PeerPersistence.java}                          |  28 +----
 .../apache/nifi/remote/client/PeerSelector.java    | 100 +++++-----------
 .../nifi/remote/client/PeerStatusProvider.java     |   7 ++
 .../nifi/remote/client/SiteToSiteClient.java       |  54 ++++++++-
 .../nifi/remote/client/SiteToSiteClientConfig.java |  11 ++
 .../nifi/remote/client/StatePeerPersistence.java   |  65 +++++++++++
 .../apache/nifi/remote/client/http/HttpClient.java |   8 +-
 .../client/socket/EndpointConnectionPool.java      |  15 ++-
 .../nifi/remote/client/socket/SocketClient.java    |   2 +-
 .../apache/nifi/remote/util/PeerStatusCache.java   |  14 ++-
 .../nifi/remote/client/TestPeerSelector.java       | 128 +++++++++++++++++++++
 .../remote/client/socket/TestSiteToSiteClient.java |  38 +++++-
 .../org/apache/nifi/groups/RemoteProcessGroup.java |   3 +
 .../nifi/controller/flow/StandardFlowManager.java  |   4 +-
 .../apache/nifi/groups/StandardProcessGroup.java   |   8 ++
 .../nifi/remote/StandardRemoteProcessGroup.java    |  22 ++--
 .../nifi/remote/StandardRemoteGroupPort.java       |   9 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java     |   8 ++
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  10 ++
 .../nifi/web/api/RemoteProcessGroupResource.java   |  57 +++++++++
 .../org/apache/nifi/web/dao/ComponentStateDAO.java |  10 ++
 .../apache/nifi/web/dao/RemoteProcessGroupDAO.java |  10 ++
 .../web/dao/impl/StandardComponentStateDAO.java    |   6 +
 .../dao/impl/StandardRemoteProcessGroupDAO.java    |  14 +++
 .../src/main/resources/nifi-web-api-context.xml    |   1 +
 28 files changed, 652 insertions(+), 140 deletions(-)

diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index afcd268..0315fdf 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -77,7 +77,6 @@ public abstract class NiFiProperties {
     public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration";
     public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
     public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration";
-    public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";
     public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration";
     public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout";
     public static final String BACKPRESSURE_COUNT = "nifi.queue.backpressure.count";
@@ -272,7 +271,6 @@ public abstract class NiFiProperties {
     public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
     public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
     public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
-    public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
     public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
     public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
     public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
@@ -729,16 +727,6 @@ public abstract class NiFiProperties {
                 DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT);
     }
 
-    public File getPersistentStateDirectory() {
-        final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY,
-                DEFAULT_PERSISTENT_STATE_DIRECTORY);
-        final File file = new File(dirName);
-        if (!file.exists()) {
-            file.mkdirs();
-        }
-        return file;
-    }
-
     // getters for cluster node properties //
     public boolean isNode() {
         return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
new file mode 100644
index 0000000..758b425
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public abstract class AbstractPeerPersistence implements PeerPersistence {
+
+    protected Logger logger = LoggerFactory.getLogger(getClass());
+
+    protected PeerStatusCache restorePeerStatuses(final BufferedReader reader,
+                                     long cachedTimestamp) throws IOException {
+        final SiteToSiteTransportProtocol transportProtocol;
+        try {
+            transportProtocol = SiteToSiteTransportProtocol.valueOf(reader.readLine());
+        } catch (IllegalArgumentException e) {
+            logger.info("Discard stored peer statuses in {} because transport protocol is not stored",
+                this.getClass().getSimpleName());
+            return null;
+        }
+
+        final Set<PeerStatus> restoredStatuses = readPeerStatuses(reader);
+
+        if (!restoredStatuses.isEmpty()) {
+            logger.info("Restored peer statuses from {} {}", this.getClass().getSimpleName(), restoredStatuses);
+            return new PeerStatusCache(restoredStatuses, cachedTimestamp, transportProtocol);
+        }
+
+        return null;
+    }
+
+    private Set<PeerStatus> readPeerStatuses(final BufferedReader reader) throws IOException {
+        final Set<PeerStatus> statuses = new HashSet<>();
+        String line;
+        while ((line = reader.readLine()) != null) {
+            final String[] splits = line.split(Pattern.quote(":"));
+            if (splits.length != 3 && splits.length != 4) {
+                continue;
+            }
+
+            final String hostname = splits[0];
+            final int port = Integer.parseInt(splits[1]);
+            final boolean secure = Boolean.parseBoolean(splits[2]);
+
+            final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
+
+            statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
+        }
+
+        return statuses;
+    }
+
+
+    @FunctionalInterface
+    protected interface IOConsumer<T> {
+        void accept(T value) throws IOException;
+    }
+
+    protected void write(final PeerStatusCache peerStatusCache, final IOConsumer<String> consumer) throws IOException {
+        consumer.accept(peerStatusCache.getTransportProtocol().name() + "\n");
+        for (final PeerStatus status : peerStatusCache.getStatuses()) {
+            final PeerDescription description = status.getPeerDescription();
+            final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
+            consumer.accept(line);
+        }
+    }
+
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
new file mode 100644
index 0000000..8fa167d
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.util.PeerStatusCache;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class FilePeerPersistence extends AbstractPeerPersistence {
+
+    private final File persistenceFile;
+
+    public FilePeerPersistence(File persistenceFile) {
+        this.persistenceFile = persistenceFile;
+    }
+
+    @Override
+    public void save(final PeerStatusCache peerStatusCache) throws IOException {
+        try (final OutputStream fos = new FileOutputStream(persistenceFile);
+             final OutputStream out = new BufferedOutputStream(fos)) {
+            write(peerStatusCache, line -> out.write(line.getBytes(StandardCharsets.UTF_8)));
+        }
+    }
+
+    @Override
+    public PeerStatusCache restore() throws IOException {
+        try (final InputStream fis = new FileInputStream(persistenceFile);
+             final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
+            return restorePeerStatuses(reader, persistenceFile.lastModified());
+        }
+    }
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
similarity index 56%
copy from nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
copy to nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
index c52b4b7..5698247 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
@@ -14,31 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.remote.util;
+package org.apache.nifi.remote.client;
 
-import java.util.Set;
+import org.apache.nifi.remote.util.PeerStatusCache;
 
-import org.apache.nifi.remote.PeerStatus;
+import java.io.IOException;
 
-public class PeerStatusCache {
+public interface PeerPersistence {
 
-    private final Set<PeerStatus> statuses;
-    private final long timestamp;
+    void save(final PeerStatusCache peerStatusCache) throws IOException;
 
-    public PeerStatusCache(final Set<PeerStatus> statuses) {
-        this(statuses, System.currentTimeMillis());
-    }
-
-    public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) {
-        this.statuses = statuses;
-        this.timestamp = timestamp;
-    }
-
-    public Set<PeerStatus> getStatuses() {
-        return statuses;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
+    PeerStatusCache restore() throws IOException;
 }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index 14c163b..8235a38 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -21,20 +21,12 @@ import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.util.PeerStatusCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -48,7 +40,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.nifi.remote.util.EventReportUtil.error;
@@ -67,7 +58,7 @@ public class PeerSelector {
     private volatile long peerRefreshTime = 0L;
     private final AtomicLong peerIndex = new AtomicLong(0L);
     private volatile PeerStatusCache peerStatusCache;
-    private final File persistenceFile;
+    private final PeerPersistence peerPersistence;
 
     private EventReporter eventReporter;
 
@@ -90,70 +81,41 @@ public class PeerSelector {
         this.systemTime = systemTime;
     }
 
-    public PeerSelector(final PeerStatusProvider peerStatusProvider, final File persistenceFile) {
+    public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
         this.peerStatusProvider = peerStatusProvider;
-        this.persistenceFile = persistenceFile;
-        Set<PeerStatus> recoveredStatuses;
-        if (persistenceFile != null && persistenceFile.exists()) {
-            try {
-                recoveredStatuses = recoverPersistedPeerStatuses(persistenceFile);
-                this.peerStatusCache = new PeerStatusCache(recoveredStatuses, persistenceFile.lastModified());
-            } catch (final IOException ioe) {
-                logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe);
-            }
-        } else {
-            peerStatusCache = null;
-        }
-    }
-
-    private void persistPeerStatuses(final Set<PeerStatus> statuses) {
-        if (persistenceFile == null) {
-            return;
-        }
+        this.peerPersistence = peerPersistence;
 
-        try (final OutputStream fos = new FileOutputStream(persistenceFile);
-             final OutputStream out = new BufferedOutputStream(fos)) {
-
-            for (final PeerStatus status : statuses) {
-                final PeerDescription description = status.getPeerDescription();
-                final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
-                out.write(line.getBytes(StandardCharsets.UTF_8));
+        try {
+            PeerStatusCache restoredPeerStatusCache = null;
+            if (peerPersistence != null) {
+                restoredPeerStatusCache = peerPersistence.restore();
+                if (restoredPeerStatusCache != null) {
+                    final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
+                    final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
+                    if (!currentProtocol.equals(cachedProtocol)) {
+                        logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
+                            peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
+                        restoredPeerStatusCache = null;
+                    }
+                }
             }
+            this.peerStatusCache = restoredPeerStatusCache;
 
-        } catch (final IOException e) {
-            error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted and peer's NCM is down," +
-                    " may be unable to transfer data until communications with NCM are restored", e.toString());
-            logger.error("", e);
+        } catch (final IOException ioe) {
+            logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
+                peerPersistence.getClass().getSimpleName(), ioe);
         }
     }
 
-    private static Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException {
-        if (!file.exists()) {
-            return null;
-        }
-
-        final Set<PeerStatus> statuses = new HashSet<>();
-        try (final InputStream fis = new FileInputStream(file);
-             final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
-
-            String line;
-            while ((line = reader.readLine()) != null) {
-                final String[] splits = line.split(Pattern.quote(":"));
-                if (splits.length != 3 && splits.length != 4) {
-                    continue;
-                }
-
-                final String hostname = splits[0];
-                final int port = Integer.parseInt(splits[1]);
-                final boolean secure = Boolean.parseBoolean(splits[2]);
-
-                final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
-
-                statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
-            }
+    private void persistPeerStatuses() {
+        try {
+            peerPersistence.save(peerStatusCache);
+        } catch (final IOException e) {
+            error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
+                " and the nodes specified at the RPG are down," +
+                " may be unable to transfer data until communications with those nodes are restored", e.toString());
+            logger.error("", e);
         }
-
-        return statuses;
     }
 
     List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) {
@@ -340,8 +302,8 @@ public class PeerSelector {
 
         try {
             final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
-            persistPeerStatuses(statuses);
-            peerStatusCache = new PeerStatusCache(statuses);
+            peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
+            persistPeerStatuses();
             logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
         } catch (Exception e) {
             warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
index 817bccf..64fd161 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
 
 import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 
 import java.io.IOException;
 import java.util.Set;
@@ -57,4 +58,10 @@ public interface PeerStatusProvider {
      */
     Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException;
 
+    /**
+     * Returns the transport protocol being used.
+     * @return the transport protocol
+     */
+    SiteToSiteTransportProtocol getTransportProtocol();
+
 }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 29eb465..b805d03 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.remote.client;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
@@ -67,7 +68,7 @@ import java.util.concurrent.TimeUnit;
  * interaction with the remote instance takes place. After data has been
  * exchanged or it is determined that no data is available, the Transaction can
  * then be canceled (via the {@link Transaction#cancel(String)} method) or can
- * be completed (via the {@link Transaction#complete(boolean)} method).
+ * be completed (via the {@link Transaction#complete()} method).
  * </p>
  *
  * <p>
@@ -164,6 +165,7 @@ public interface SiteToSiteClient extends Closeable {
         private KeystoreType truststoreType;
         private EventReporter eventReporter = EventReporter.NO_OP;
         private File peerPersistenceFile;
+        private StateManager stateManager;
         private boolean useCompression;
         private String portName;
         private String portIdentifier;
@@ -482,8 +484,8 @@ public interface SiteToSiteClient extends Closeable {
         /**
          * Specifies a file that the client can write to in order to persist the
          * list of nodes in the remote cluster and recover the list of nodes
-         * upon restart. This allows the client to function if the remote
-         * Cluster Manager is unavailable, even after a restart of the client
+         * upon restart. This allows the client to function if the remote nodes
+         * specified by the urls are unavailable, even after a restart of the client
          * software. If not specified, the list of nodes will not be persisted
          * and a failure of the Cluster Manager will result in not being able to
          * communicate with the remote instance if a new client is created.
@@ -497,6 +499,32 @@ public interface SiteToSiteClient extends Closeable {
         }
 
         /**
+         * <p>Specifies StateManager that the client can persist the
+         * list of nodes in the remote cluster and recover the list of nodes
+         * upon restart. This allows the client to function if the remote nodes
+         * specified by the urls are unavailable, even after a restart of the client
+         * software. If not specified, the list of nodes will not be persisted
+         * and a failure of the Cluster Manager will result in not being able to
+         * communicate with the remote instance if a new client is created.</p>
+         * <p>Using a StateManager is preferable over using a File to persist the list of nodes
+         * if the SiteToSiteClient is used by a NiFi component having access to a NiFi context.
+         * So that the list of nodes can be persisted in the same manner with other stateful information.</p>
+         * <p>Since StateManager is not serializable, the specified StateManager
+         * will not be serialized, and a de-serialized SiteToSiteClientConfig
+         * instance will not have StateManager even if the original config has one.
+         * Use {@link #peerPersistenceFile(File)} instead
+         * if the same SiteToSiteClientConfig needs to be distributed among multiple
+         * clients via serialization, and also persistent connectivity is required
+         * in case of having no available remote node specified by the urls when a client restarts.</p>
+         * @param stateManager state manager
+         * @return the builder
+         */
+        public Builder stateManager(final StateManager stateManager) {
+            this.stateManager = stateManager;
+            return this;
+        }
+
+        /**
          * Specifies whether or not data should be compressed before being
          * transferred to or from the remote instance.
          *
@@ -748,6 +776,7 @@ public interface SiteToSiteClient extends Closeable {
         private final KeystoreType truststoreType;
         private final EventReporter eventReporter;
         private final File peerPersistenceFile;
+        private final transient StateManager stateManager;
         private final boolean useCompression;
         private final SiteToSiteTransportProtocol transportProtocol;
         private final String portName;
@@ -773,6 +802,7 @@ public interface SiteToSiteClient extends Closeable {
             this.truststoreType = null;
             this.eventReporter = null;
             this.peerPersistenceFile = null;
+            this.stateManager = null;
             this.useCompression = false;
             this.portName = null;
             this.portIdentifier = null;
@@ -801,6 +831,7 @@ public interface SiteToSiteClient extends Closeable {
             this.truststoreType = builder.truststoreType;
             this.eventReporter = builder.eventReporter;
             this.peerPersistenceFile = builder.peerPersistenceFile;
+            this.stateManager = builder.stateManager;
             this.useCompression = builder.useCompression;
             this.portName = builder.portName;
             this.portIdentifier = builder.portIdentifier;
@@ -922,6 +953,23 @@ public interface SiteToSiteClient extends Closeable {
         }
 
         @Override
+        public StateManager getStateManager() {
+            return stateManager;
+        }
+
+        @Override
+        public PeerPersistence getPeerPersistence() {
+            if (stateManager != null) {
+                return new StatePeerPersistence(stateManager);
+
+            } else if (peerPersistenceFile != null) {
+                return new FilePeerPersistence(peerPersistenceFile);
+            }
+
+            return null;
+        }
+
+        @Override
         public EventReporter getEventReporter() {
             return eventReporter;
         }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 8da5e70..604e078 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -111,6 +112,16 @@ public interface SiteToSiteClientConfig extends Serializable {
     File getPeerPersistenceFile();
 
     /**
+     * @return the StateManager to be used for persisting the nodes of a remote
+     */
+    StateManager getStateManager();
+
+    /**
+     * @return a PeerPersistence implementation based on configured persistent target
+     */
+    PeerPersistence getPeerPersistence();
+
+    /**
      * @return a boolean indicating whether or not compression will be used to
      * transfer data to and from the remote instance
      */
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
new file mode 100644
index 0000000..185e8fd
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.remote.util.PeerStatusCache;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StatePeerPersistence extends AbstractPeerPersistence {
+
+    static final String STATE_KEY_PEERS = "peers";
+    static final String STATE_KEY_TRANSPORT_PROTOCOL = "protocol";
+    static final String STATE_KEY_PEERS_TIMESTAMP = "peers.ts";
+
+    private final StateManager stateManager;
+
+    public StatePeerPersistence(StateManager stateManager) {
+        this.stateManager = stateManager;
+    }
+
+    @Override
+    public void save(final PeerStatusCache peerStatusCache) throws IOException {
+        final StateMap state = stateManager.getState(Scope.LOCAL);
+        final Map<String, String> stateMap = state.toMap();
+        final Map<String, String> updatedStateMap = new HashMap<>(stateMap);
+        final StringBuilder peers = new StringBuilder();
+        write(peerStatusCache, peers::append);
+        updatedStateMap.put(STATE_KEY_PEERS, peers.toString());
+        updatedStateMap.put(STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+        stateManager.setState(updatedStateMap, Scope.LOCAL);
+    }
+
+    @Override
+    public PeerStatusCache restore() throws IOException {
+        final StateMap state = stateManager.getState(Scope.LOCAL);
+        final String storedPeers = state.get(STATE_KEY_PEERS);
+        if (storedPeers != null && !storedPeers.isEmpty()) {
+            try (final BufferedReader reader = new BufferedReader(new StringReader(storedPeers))) {
+                return restorePeerStatuses(reader, Long.parseLong(state.get(STATE_KEY_PEERS_TIMESTAMP)));
+            }
+        }
+        return null;
+    }
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index e1516d2..690cdfd 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -34,6 +34,7 @@ import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpClientTransaction;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
 import org.apache.nifi.web.api.dto.remote.PeerDTO;
@@ -64,7 +65,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
     public HttpClient(final SiteToSiteClientConfig config) {
         super(config);
 
-        peerSelector = new PeerSelector(this, config.getPeerPersistenceFile());
+        peerSelector = new PeerSelector(this, config.getPeerPersistence());
         peerSelector.setEventReporter(config.getEventReporter());
 
         taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@@ -246,4 +247,9 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
             transaction.getCommunicant().getCommunicationsSession().interrupt();
         }
     }
+
+    @Override
+    public SiteToSiteTransportProtocol getTransportProtocol() {
+        return SiteToSiteTransportProtocol.HTTP;
+    }
 }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 17d66da..53bd963 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -23,6 +23,7 @@ import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.PeerPersistence;
 import org.apache.nifi.remote.client.PeerSelector;
 import org.apache.nifi.remote.client.PeerStatusProvider;
 import org.apache.nifi.remote.client.SiteInfoProvider;
@@ -36,6 +37,7 @@ import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.exception.UnreachableClusterException;
 import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
 import org.apache.nifi.security.util.CertificateUtils;
 import org.slf4j.Logger;
@@ -44,7 +46,6 @@ import org.slf4j.LoggerFactory;
 import javax.net.ssl.SSLContext;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -91,8 +92,9 @@ public class EndpointConnectionPool implements PeerStatusProvider {
     private final InetAddress localAddress;
 
     public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
-        final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider,
-        final InetAddress localAddress) {
+                                  final SSLContext sslContext, final EventReporter eventReporter,
+                                  final PeerPersistence peerPersistence, final SiteInfoProvider siteInfoProvider,
+                                  final InetAddress localAddress) {
         Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
 
         this.remoteDestination = remoteDestination;
@@ -104,7 +106,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
 
         this.siteInfoProvider = siteInfoProvider;
 
-        peerSelector = new PeerSelector(this, persistenceFile);
+        peerSelector = new PeerSelector(this, peerPersistence);
         peerSelector.setEventReporter(eventReporter);
 
         // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
@@ -563,5 +565,8 @@ public class EndpointConnectionPool implements PeerStatusProvider {
         }
     }
 
-
+    @Override
+    public SiteToSiteTransportProtocol getTransportProtocol() {
+        return SiteToSiteTransportProtocol.RAW;
+    }
 }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 0999d57..ff8e0d6 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -52,7 +52,7 @@ public class SocketClient extends AbstractSiteToSiteClient {
                 createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
                 commsTimeout,
                 (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
-                config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(),
+                config.getSslContext(), config.getEventReporter(), config.getPeerPersistence(),
                 siteInfoProvider, config.getLocalAddress()
         );
 
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
index c52b4b7..acca34e 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -19,19 +19,19 @@ package org.apache.nifi.remote.util;
 import java.util.Set;
 
 import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 
 public class PeerStatusCache {
 
     private final Set<PeerStatus> statuses;
     private final long timestamp;
+    private final SiteToSiteTransportProtocol transportProtocol;
 
-    public PeerStatusCache(final Set<PeerStatus> statuses) {
-        this(statuses, System.currentTimeMillis());
-    }
-
-    public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) {
+    public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp,
+                           final SiteToSiteTransportProtocol transportProtocol) {
         this.statuses = statuses;
         this.timestamp = timestamp;
+        this.transportProtocol = transportProtocol;
     }
 
     public Set<PeerStatus> getStatuses() {
@@ -41,4 +41,8 @@ public class PeerStatusCache {
     public long getTimestamp() {
         return timestamp;
     }
+
+    public SiteToSiteTransportProtocol getTransportProtocol() {
+        return transportProtocol;
+    }
 }
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index e29efd8..72dd9a6 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -16,21 +16,34 @@
  */
 package org.apache.nifi.remote.client;
 
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.groupingBy;
@@ -40,8 +53,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
 
 public class TestPeerSelector {
 
@@ -255,4 +271,116 @@ public class TestPeerSelector {
         assert(!peers.isEmpty());
         assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription());
     }
+
+    @Test
+    public void testPeerStatusManagedCache() throws Exception {
+        final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
+        final StateManager stateManager = Mockito.mock(StateManager.class);
+        final StateMap stateMap = Mockito.mock(StateMap.class);
+        final Map<String, String> state = new HashMap<>();
+        state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
+        state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+        when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+        when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
+        when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
+        doAnswer(invocation -> {
+            final Map<String, String> updatedMap = invocation.getArgument(0);
+            state.clear();
+            state.putAll(updatedMap);
+            return null;
+        }).when(stateManager).setState(any(), eq(Scope.LOCAL));
+
+        final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
+        when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+        when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+            .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
+
+        // PeerSelector should restore peer statuses from managed cache.
+        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
+        peerSelector.refreshPeers();
+        assertEquals("Restored peers should be used",
+            "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+
+        // If the stored state is too old, PeerSelector refreshes peers.
+        state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis() - 120_000));
+        peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
+        peerSelector.refreshPeers();
+        assertEquals("Peers should be refreshed",
+            "RAW\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+    }
+
+    @Test
+    public void testPeerStatusManagedCacheDifferentProtocol() throws Exception {
+        final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
+        final StateManager stateManager = Mockito.mock(StateManager.class);
+        final StateMap stateMap = Mockito.mock(StateMap.class);
+        final Map<String, String> state = new HashMap<>();
+        state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
+        state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+        when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
+        when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
+        when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
+        doAnswer(invocation -> {
+            final Map<String, String> updatedMap = invocation.getArgument(0);
+            state.clear();
+            state.putAll(updatedMap);
+            return null;
+        }).when(stateManager).setState(any(), eq(Scope.LOCAL));
+
+        final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
+        when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+        when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+            .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
+
+        // PeerSelector should NOT restore peer statuses from managed cache because protocol changed.
+        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
+        peerSelector.refreshPeers();
+        assertEquals("Restored peers should NOT be used",
+            "HTTP\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+    }
+
+    @Test
+    public void testPeerStatusFileCache() throws Exception {
+        final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
+
+        final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
+        when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+        when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+        when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+            .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
+
+        final File file = File.createTempFile("peers", "txt");
+        file.deleteOnExit();
+
+        try (final FileOutputStream fos = new FileOutputStream(file)) {
+            fos.write("RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n".getBytes(StandardCharsets.UTF_8));
+        }
+
+        final Supplier<String> readFile = () -> {
+            try (final FileInputStream fin = new FileInputStream(file);
+                 final BufferedReader reader = new BufferedReader(new InputStreamReader(fin))) {
+                final StringBuilder lines = new StringBuilder();
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    lines.append(line).append("\n");
+                }
+                return lines.toString();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+
+        // PeerSelector should restore peer statuses from managed cache.
+        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
+        peerSelector.refreshPeers();
+        assertEquals("Restored peers should be used",
+            "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", readFile.get());
+
+        // If the stored state is too old, PeerSelector refreshes peers.
+        file.setLastModified(System.currentTimeMillis() - 120_000);
+        peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
+        peerSelector.refreshPeers();
+        assertEquals("Peers should be refreshed",
+            "RAW\nnifi0:8081:false:true\n", readFile.get());
+    }
 }
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index c0b5e83..7c70a7a 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -19,19 +19,21 @@ package org.apache.nifi.remote.client.socket;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
@@ -137,6 +139,40 @@ public class TestSiteToSiteClient {
     }
 
     @Test
+    public void testSerializationWithStateManager() {
+        final StateManager stateManager = Mockito.mock(StateManager.class);
+        final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+            .url("http://localhost:8080/nifi")
+            .portName("input")
+            .stateManager(stateManager)
+            .buildConfig();
+
+        final Kryo kryo = new Kryo();
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Output output = new Output(out);
+
+        try {
+            kryo.writeObject(output, clientConfig);
+        } finally {
+            output.close();
+        }
+
+        final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+        final Input input = new Input(in);
+
+        try {
+            SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class);
+            Assert.assertEquals(clientConfig.getUrls(), clientConfig2.getUrls());
+            // Serialization works, but the state manager is not serialized.
+            Assert.assertNotNull(clientConfig.getStateManager());
+            Assert.assertNull(clientConfig2.getStateManager());
+        } finally {
+            input.close();
+        }
+    }
+
+    @Test
     public void testGetUrlBackwardCompatibility() {
         final Set<String> urls = new LinkedHashSet<>();
         urls.add("http://node1:8080/nifi");
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 39be045..f9c1021 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -19,6 +19,7 @@ package org.apache.nifi.groups;
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.VersionedComponent;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Positionable;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.events.EventReporter;
@@ -240,4 +241,6 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
     void verifyCanStopTransmitting();
 
     void verifyCanUpdate();
+
+    StateManager getStateManager();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index a4fe16a..fd74d54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -211,7 +211,9 @@ public class StandardFlowManager implements FlowManager {
     }
 
     public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
-        return new StandardRemoteProcessGroup(requireNonNull(id), uris, null, processScheduler, bulletinRepository, sslContext, nifiProperties);
+        return new StandardRemoteProcessGroup(requireNonNull(id), uris, null,
+            processScheduler, bulletinRepository, sslContext, nifiProperties,
+            flowController.getStateManagerProvider().getStateManager(id));
     }
 
     public void setRootGroup(final ProcessGroup rootGroup) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 5256552..82c51ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -853,6 +853,14 @@ public final class StandardProcessGroup implements ProcessGroup {
             remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved);
             remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved);
 
+            final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
+            scheduler.submitFrameworkTask(new Runnable() {
+                @Override
+                public void run() {
+                    stateManagerProvider.onComponentRemoved(remoteGroup.getIdentifier());
+                }
+            });
+
             remoteGroups.remove(remoteGroupId);
             LOG.info("{} removed from flow", remoteProcessGroup);
         } finally {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 48e2127..8514b5d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,7 +18,6 @@ package org.apache.nifi.remote;
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
@@ -54,6 +53,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Port;
@@ -100,6 +100,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     private final ProcessScheduler scheduler;
     private final EventReporter eventReporter;
     private final NiFiProperties nifiProperties;
+    private final StateManager stateManager;
     private final long remoteContentsCacheExpiration;
     private volatile boolean initialized = false;
 
@@ -146,8 +147,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     private final ScheduledExecutorService backgroundThreadExecutor;
 
     public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup, final ProcessScheduler processScheduler,
-                                      final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties) {
+                                      final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties,
+                                      final StateManager stateManager) {
         this.nifiProperties = nifiProperties;
+        this.stateManager = stateManager;
         this.id = requireNonNull(id);
 
         this.targetUris = targetUris;
@@ -234,11 +237,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     @Override
     public void onRemove() {
         backgroundThreadExecutor.shutdown();
-
-        final File file = getPeerPersistenceFile();
-        if (file.exists() && !file.delete()) {
-            logger.warn("Failed to remove {}. This file should be removed manually.", file);
-        }
     }
 
     @Override
@@ -1366,11 +1364,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         }
     }
 
-    private File getPeerPersistenceFile() {
-        final File stateDir = nifiProperties.getPersistentStateDirectory();
-        return new File(stateDir, getIdentifier() + ".peers");
-    }
-
     @Override
     public Optional<String> getVersionedComponentId() {
         return Optional.ofNullable(versionedComponentId.get());
@@ -1393,4 +1386,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
             }
         }
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 3b4d630..bd2687e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -42,7 +42,6 @@ import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.exception.UnreachableClusterException;
 import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
 import org.apache.nifi.remote.util.StandardDataPacket;
@@ -56,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -125,11 +123,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         this.targetId = targetId;
     }
 
-    private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties, final SiteToSiteTransportProtocol transportProtocol) {
-        final File stateDir = nifiProperties.getPersistentStateDirectory();
-        return new File(stateDir, String.format("%s_%s.peers", portId, transportProtocol.name()));
-    }
-
     @Override
     public boolean isTargetRunning() {
         return targetRunning.get();
@@ -181,7 +174,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
                 .sslContext(sslContext)
                 .useCompression(isUseCompression())
                 .eventReporter(remoteGroup.getEventReporter())
-                .peerPersistenceFile(getPeerPersistenceFile(getIdentifier(), nifiProperties, remoteGroup.getTransportProtocol()))
+                .stateManager(remoteGroup.getStateManager())
                 .nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS)
                 .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
                 .transportProtocol(remoteGroup.getTransportProtocol())
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 46fcd80..86ec526 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1675,6 +1675,14 @@ public interface NiFiServiceFacade {
      */
     void clearReportingTaskState(String reportingTaskId);
 
+    /**
+     * Gets the state for the specified RemoteProcessGroup.
+     *
+     * @param remoteProcessGroupId the RemoteProcessGroup id
+     * @return  the component state
+     */
+    ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId);
+
 
     // ----------------------------------------
     // Label methods
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 6adc662..9c96cc5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1537,6 +1537,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId) {
+        final StateMap clusterState = isClustered() ? remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.CLUSTER) : null;
+        final StateMap localState = remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.LOCAL);
+
+        // processor will be non null as it was already found when getting the state
+        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+        return dtoFactory.createComponentStateDTO(remoteProcessGroupId, remoteProcessGroup.getClass(), localState, clusterState);
+    }
+
+    @Override
     public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) {
         final Connection connection = connectionDAO.getConnection(connectionId);
         final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index f034082..575e01b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -30,10 +30,12 @@ import org.apache.nifi.authorization.resource.OperationAuthorizable;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
@@ -827,6 +829,61 @@ public class RemoteProcessGroupResource extends ApplicationResource {
         );
     }
 
+    /**
+     * Gets the state for a RemoteProcessGroup.
+     *
+     * @param id The id of the RemoteProcessGroup
+     * @return a componentStateEntity
+     * @throws InterruptedException if interrupted
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/state")
+    @ApiOperation(
+        value = "Gets the state for a RemoteProcessGroup",
+        response = ComponentStateEntity.class,
+        authorizations = {
+            @Authorization(value = "Write - /remote-process-groups/{uuid}")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getState(
+        @ApiParam(
+            value = "The processor id.",
+            required = true
+        )
+        @PathParam("id") final String id) throws InterruptedException {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable authorizable = lookup.getRemoteProcessGroup(id);
+            authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+        });
+
+        // get the component state
+        final ComponentStateDTO state = serviceFacade.getRemoteProcessGroupState(id);
+
+        // generate the response entity
+        final ComponentStateEntity entity = new ComponentStateEntity();
+        entity.setComponentState(state);
+
+        // generate the response
+        return generateOkResponse(entity).build();
+    }
+
     private RemoteProcessGroupDTO createDTOWithDesiredRunStatus(final String id, final RemotePortRunStatusEntity entity) {
         final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
         dto.setId(id);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
index 636addb..adb50af 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
@@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.RemoteProcessGroup;
 
 public interface ComponentStateDAO {
 
@@ -71,4 +72,13 @@ public interface ComponentStateDAO {
      * @param reportingTask reporting task
      */
     void clearState(ReportingTaskNode reportingTask);
+
+    /**
+     * Gets the state map for the specified RemoteProcessGroup.
+     *
+     * @param remoteProcessGroup RemoteProcessGroup
+     * @param scope     scope
+     * @return state map
+     */
+    StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope scope);
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
index 2542185..7446a34 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.web.dao;
 
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
@@ -120,4 +122,12 @@ public interface RemoteProcessGroupDAO {
      * @param remoteProcessGroupId The remote process group id
      */
     void deleteRemoteProcessGroup(String remoteProcessGroupId);
+
+    /**
+     * Gets the specified RemoteProcessGroupId.
+     *
+     * @param remoteProcessGroupId RemoteProcessGroupId id
+     * @return state map
+     */
+    StateMap getState(String remoteProcessGroupId, Scope scope);
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
index f0a9094..c48186b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
@@ -23,6 +23,7 @@ import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.dao.ComponentStateDAO;
 
@@ -90,6 +91,11 @@ public class StandardComponentStateDAO implements ComponentStateDAO {
         clearState(reportingTask.getIdentifier());
     }
 
+    @Override
+    public StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope scope) {
+        return getState(remoteProcessGroup.getIdentifier(), scope);
+    }
+
     /* setters */
 
     public void setStateManagerProvider(StateManagerProvider stateManagerProvider) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index b8399f7..274b5e4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.web.dao.impl;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.exception.ValidationException;
@@ -31,6 +33,7 @@ import org.apache.nifi.web.api.dto.BatchSettingsDTO;
 import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.dao.ComponentStateDAO;
 import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
 
 import java.util.ArrayList;
@@ -43,6 +46,7 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
 public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO {
 
     private FlowController flowController;
+    private ComponentStateDAO componentStateDAO;
 
     private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) {
         final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
@@ -465,7 +469,17 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
         remoteProcessGroup.getProcessGroup().removeRemoteProcessGroup(remoteProcessGroup);
     }
 
+    @Override
+    public StateMap getState(String remoteProcessGroupId, Scope scope) {
+        final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
+        return componentStateDAO.getState(remoteProcessGroup, scope);
+    }
+
     public void setFlowController(FlowController flowController) {
         this.flowController = flowController;
     }
+
+    public void setComponentStateDAO(ComponentStateDAO componentStateDAO) {
+        this.componentStateDAO = componentStateDAO;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 72ffc2c..56b8d4d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -82,6 +82,7 @@
     </bean>
     <bean id="remoteProcessGroupDAO" class="org.apache.nifi.web.dao.impl.StandardRemoteProcessGroupDAO">
         <property name="flowController" ref="flowController"/>
+        <property name="componentStateDAO" ref="componentStateDAO"/>
     </bean>
     <bean id="labelDAO" class="org.apache.nifi.web.dao.impl.StandardLabelDAO">
         <property name="flowController" ref="flowController"/>


Mime
View raw message