nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [1/2] nifi git commit: NIFI-3541: Add local network interface capability to site-to-site client and remote group and ports
Date Mon, 06 Mar 2017 16:18:12 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 000414e7e -> 16bde02ed


NIFI-3541: Add local network interface capability to site-to-site client and remote group
and ports


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9e68f02f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9e68f02f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9e68f02f

Branch: refs/heads/master
Commit: 9e68f02f1fb7eb442f6c9580b46255f713d8b191
Parents: 000414e
Author: Mark Payne <markap14@hotmail.com>
Authored: Wed Mar 1 13:30:04 2017 -0500
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Mar 6 10:36:30 2017 -0500

----------------------------------------------------------------------
 .../nifi/remote/client/SiteToSiteClient.java    |  40 +++++-
 .../remote/client/SiteToSiteClientConfig.java   |   7 ++
 .../remote/util/SiteToSiteRestApiClient.java    | 109 ++++++++++-------
 .../apache/nifi/controller/AbstractPort.java    |  32 ++---
 .../apache/nifi/groups/RemoteProcessGroup.java  |  24 ++++
 .../org/apache/nifi/remote/RemoteGroupPort.java |   1 +
 .../nifi/remote/StandardRemoteProcessGroup.java | 122 ++++++++++++++-----
 .../nifi/remote/StandardRemoteGroupPort.java    |  27 +++-
 .../remote/TestStandardRemoteGroupPort.java     |   1 +
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  66 ++++++----
 10 files changed, 303 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 3d7bacc..daff70d 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -39,6 +39,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
+import java.net.InetAddress;
 import java.security.KeyStore;
 import java.security.SecureRandom;
 import java.util.LinkedHashSet;
@@ -168,6 +169,7 @@ public interface SiteToSiteClient extends Closeable {
         private int batchCount;
         private long batchSize;
         private long batchNanos;
+        private InetAddress localAddress;
         private SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW;
         private HttpProxy httpProxy;
 
@@ -198,6 +200,7 @@ public interface SiteToSiteClient extends Closeable {
             this.batchCount = config.getPreferredBatchCount();
             this.batchSize = config.getPreferredBatchSize();
             this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
+            this.localAddress = config.getLocalAddress();
             this.httpProxy = config.getHttpProxy();
 
             return this;
@@ -223,12 +226,31 @@ public interface SiteToSiteClient extends Closeable {
         }
 
         /**
-         * <p>Specifies the URLs of the remote NiFi instance.</p>
-         * <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to
and from
-         * nodes will be automatically load balanced across the different nodes.</p>
+         * <p>
+         * Specifies the local address to use when communicating with the remote NiFi instance.
+         * </p>
+         *
+         * @param localAddress the local address to use, or <code>null</code>
to use <code>anyLocal</code> address.
+         * @return the builder
+         */
+        public Builder localAddress(final InetAddress localAddress) {
+            this.localAddress = localAddress;
+            return this;
+        }
+
+        /**
+         * <p>
+         * Specifies the URLs of the remote NiFi instance.
+         * </p>
+         * <p>
+         * If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
+         * nodes will be automatically load balanced across the different nodes.
+         * </p>
          *
-         * <p>Multiple urls provide better connectivity with a NiFi cluster, able to
connect
-         * to the target cluster at long as one of the specified urls is accessible.</p>
+         * <p>
+         * Multiple urls provide better connectivity with a NiFi cluster, able to connect
+         * to the target cluster at long as one of the specified urls is accessible.
+         * </p>
          *
          * @param urls urls of remote instance
          * @return the builder
@@ -717,6 +739,7 @@ public interface SiteToSiteClient extends Closeable {
         private final long batchSize;
         private final long batchNanos;
         private final HttpProxy httpProxy;
+        private final InetAddress localAddress;
 
         // some serialization frameworks require a default constructor
         private StandardSiteToSiteClientConfig() {
@@ -740,6 +763,7 @@ public interface SiteToSiteClient extends Closeable {
             this.batchNanos = 0;
             this.transportProtocol = null;
             this.httpProxy = null;
+            this.localAddress = null;
         }
 
         private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
@@ -766,6 +790,7 @@ public interface SiteToSiteClient extends Closeable {
             this.batchNanos = builder.batchNanos;
             this.transportProtocol = builder.getTransportProtocol();
             this.httpProxy = builder.getHttpProxy();
+            this.localAddress = builder.localAddress;
         }
 
         @Override
@@ -931,5 +956,10 @@ public interface SiteToSiteClient extends Closeable {
         public HttpProxy getHttpProxy() {
             return httpProxy;
         }
+
+        @Override
+        public InetAddress getLocalAddress() {
+            return localAddress;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 5bdeee4..83e8328 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
 
 import java.io.File;
 import java.io.Serializable;
+import java.net.InetAddress;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +37,7 @@ public interface SiteToSiteClientConfig extends Serializable {
      * for backward compatibility for implementations that does not expect multiple URLs.
      * {@link #getUrls()} should be used instead then should support multiple URLs when making
requests.
      */
+    @Deprecated
     String getUrl();
 
     /**
@@ -171,4 +173,9 @@ public interface SiteToSiteClientConfig extends Serializable {
      */
     HttpProxy getHttpProxy();
 
+    /**
+     * @return the InetAddress to bind to for the local address when creating a socket, or
+     *         {@code null} to bind to the {@code anyLocal} address.
+     */
+    InetAddress getLocalAddress();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 89da6a0..e6777b0 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -16,6 +16,55 @@
  */
 package org.apache.nifi.remote.util;
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
@@ -87,53 +136,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
-import java.util.regex.Pattern;
-
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
-
 public class SiteToSiteRestApiClient implements Closeable {
 
     private static final String EVENT_CATEGORY = "Site-to-Site";
@@ -160,6 +162,7 @@ public class SiteToSiteRestApiClient implements Closeable {
     private CloseableHttpAsyncClient httpAsyncClient;
 
     private boolean compress = false;
+    private InetAddress localAddress = null;
     private long requestExpirationMillis = 0;
     private int serverTransactionTtl = 0;
     private int batchCount = 0;
@@ -239,6 +242,10 @@ public class SiteToSiteRestApiClient implements Closeable {
             .setConnectTimeout(connectTimeoutMillis)
             .setSocketTimeout(readTimeoutMillis);
 
+        if (localAddress != null) {
+            requestConfigBuilder.setLocalAddress(localAddress);
+        }
+        
         if (proxy != null) {
             requestConfigBuilder.setProxy(proxy.getHttpHost());
         }
@@ -916,6 +923,8 @@ public class SiteToSiteRestApiClient implements Closeable {
         extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
         extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
         extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
+        extendingApiClient.localAddress = this.localAddress;
+
         final int extendFrequency = serverTransactionTtl / 2;
 
         ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
@@ -1197,10 +1206,12 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
         this.connectTimeoutMillis = connectTimeoutMillis;
+        setupRequestConfig();
     }
 
     public void setReadTimeoutMillis(final int readTimeoutMillis) {
         this.readTimeoutMillis = readTimeoutMillis;
+        setupRequestConfig();
     }
 
     public static String getFirstUrl(final String clusterUrlStr) {
@@ -1336,6 +1347,10 @@ public class SiteToSiteRestApiClient implements Closeable {
     public void setCompress(final boolean compress) {
         this.compress = compress;
     }
+    
+    public void setLocalAddress(final InetAddress localAddress) {
+        this.localAddress = localAddress;
+    }
 
     public void setRequestExpirationMillis(final long requestExpirationMillis) {
         if (requestExpirationMillis < 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index 1177dad..4d061b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -16,6 +16,22 @@
  */
 package org.apache.nifi.controller;
 
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.nifi.authorization.Resource;
@@ -36,22 +52,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.FormatUtils;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
-
 public abstract class AbstractPort implements Port {
 
     public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 64e2ca0..cb1e6c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -17,18 +17,22 @@
 package org.apache.nifi.groups;
 
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.Positionable;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 
+import java.net.InetAddress;
+import java.util.Collection;
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable {
 
+    @Override
     String getIdentifier();
 
     String getTargetUri();
@@ -155,6 +159,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
     String getAuthorizationIssue();
 
     /**
+     * Validates the current configuration, returning ValidationResults for any
+     * invalid configuration parameter.
+     *
+     * @return Collection of validation result objects for any invalid findings
+     *         only. If the collection is empty then the component is valid. Guaranteed
+     *         non-null
+     */
+    Collection<ValidationResult> validate();
+
+    /**
      * @return the {@link EventReporter} that can be used to report any notable
      * events
      */
@@ -180,6 +194,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
 
     void setProxyPassword(String proxyPassword);
 
+    void setNetworkInterface(String interfaceName);
+
+    String getNetworkInterface();
+
+    /**
+     * Returns the InetAddress that the will this instance will bind to when communicating
with a
+     * remote NiFi instance, or <code>null</code> if no specific address has
been specified
+     */
+    InetAddress getLocalAddress();
+
     /**
      * Initiates a task in the remote process group to re-initialize, as a
      * result of clustering changes

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
index 8cad103..f8f4b20 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -33,6 +33,7 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port,
Remo
 
     public abstract TransferDirection getTransferDirection();
 
+    @Override
     public abstract boolean isUseCompression();
 
     public abstract void setUseCompression(boolean useCompression);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 855dab7..67c8f11 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,14 +18,15 @@ package org.apache.nifi.remote;
 
 import static java.util.Objects.requireNonNull;
 
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.UniformInterfaceException;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -40,12 +41,15 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.core.Response;
+
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Port;
@@ -74,6 +78,11 @@ import org.apache.nifi.web.api.dto.PortDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.UniformInterfaceException;
+
 /**
  * Represents the Root Process Group of a remote NiFi Instance. Holds
  * information about that remote instance, as well as {@link IncomingPort}s and
@@ -99,7 +108,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     private final AtomicReference<String> comments = new AtomicReference<>();
     private final AtomicReference<ProcessGroup> processGroup;
     private final AtomicBoolean transmitting = new AtomicBoolean(false);
-    private final FlowController flowController;
     private final SSLContext sslContext;
 
     private volatile String communicationsTimeout = "30 sec";
@@ -111,6 +119,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
     private volatile String proxyUser;
     private volatile String proxyPassword;
 
+    private String networkInterfaceName;
+    private InetAddress localAddress;
+    private ValidationResult nicValidationResult;
+
+
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
@@ -135,7 +148,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
                                       final FlowController flowController, final SSLContext
sslContext, final NiFiProperties nifiProperties) {
         this.nifiProperties = nifiProperties;
         this.id = requireNonNull(id);
-        this.flowController = requireNonNull(flowController);
 
         this.targetUris = targetUris;
         this.targetId = null;
@@ -354,6 +366,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
         return authorizationIssue;
     }
 
+    @Override
+    public Collection<ValidationResult> validate() {
+        return (nicValidationResult == null) ? Collections.emptyList() : Collections.singletonList(nicValidationResult);
+    }
+
     public int getInputPortCount() {
         readLock.lock();
         try {
@@ -606,7 +623,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
             }
 
             final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(),
descriptor.getName(), getProcessGroup(),
-                    this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT,
sslContext, scheduler, nifiProperties);
+                this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext,
scheduler, nifiProperties);
             outputPorts.put(descriptor.getId(), port);
 
             if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
@@ -672,7 +689,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
             }
 
             final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(),
descriptor.getName(), getProcessGroup(), this,
-                    TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext,
scheduler, nifiProperties);
+                TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler,
nifiProperties);
 
             if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
                 port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount());
@@ -741,15 +758,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
         }
     }
 
-    private ProcessGroup getRootGroup() {
-        return getRootGroup(getProcessGroup());
-    }
-
-    private ProcessGroup getRootGroup(final ProcessGroup context) {
-        final ProcessGroup parent = context.getParent();
-        return parent == null ? context : getRootGroup(parent);
-    }
-
     @Override
     public Date getLastRefreshTime() {
         readLock.lock();
@@ -856,10 +864,75 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
         }
     }
 
+    @Override
+    public String getNetworkInterface() {
+        readLock.lock();
+        try {
+            return networkInterfaceName;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public void setNetworkInterface(final String interfaceName) {
+        writeLock.lock();
+        try {
+            this.networkInterfaceName = interfaceName;
+
+            try {
+                final Enumeration<InetAddress> inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses();
+
+                if (inetAddresses.hasMoreElements()) {
+                    this.localAddress = inetAddresses.nextElement();
+                    this.nicValidationResult = null;
+                } else {
+                    this.localAddress = null;
+                    this.nicValidationResult = new ValidationResult.Builder()
+                        .input(interfaceName)
+                        .subject("Network Interface Name")
+                        .valid(false)
+                        .explanation("No IP Address could be found that is bound to the interface
with name " + interfaceName)
+                        .build();
+                }
+            } catch (final Exception e) {
+                this.localAddress = null;
+                this.nicValidationResult = new ValidationResult.Builder()
+                    .input(interfaceName)
+                    .subject("Network Interface Name")
+                    .valid(false)
+                    .explanation("Could not obtain Network Interface with name " + interfaceName)
+                    .build();
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public InetAddress getLocalAddress() {
+        readLock.lock();
+        try {
+            if (nicValidationResult != null && !nicValidationResult.isValid()) {
+                return null;
+            }
+
+            return localAddress;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
     private SiteToSiteRestApiClient getSiteToSiteRestApiClient() {
         SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost,
proxyPort, proxyUser, proxyPassword), getEventReporter());
         apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
         apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+
+        final InetAddress localAddress = getLocalAddress();
+        if (localAddress != null) {
+            apiClient.setLocalAddress(localAddress);
+        }
+
         return apiClient;
     }
 
@@ -886,17 +959,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
         return remotePorts;
     }
 
-    private RemoteProcessGroupPortDescriptor convertPortToRemotePortDescriptor(final Port
port) {
-        final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
-        descriptor.setComments(port.getComments());
-        descriptor.setExists(true);
-        descriptor.setGroupId(port.getProcessGroup().getIdentifier());
-        descriptor.setId(port.getIdentifier());
-        descriptor.setName(port.getName());
-        descriptor.setTargetRunning(port.isRunning());
-        return descriptor;
-    }
-
     @Override
     public boolean isTransmitting() {
         return transmitting.get();
@@ -1216,6 +1278,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
                 if (port.hasIncomingConnection() && !port.getTargetExists()) {
                     throw new IllegalStateException(this.getIdentifier() + " has a Connection
to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
                 }
+
+                port.verifyCanStart();
             }
 
             for (final StandardRemoteGroupPort port : outputPorts.values()) {
@@ -1226,6 +1290,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
                 if (!port.getConnections().isEmpty() && !port.getTargetExists())
{
                     throw new IllegalStateException(this.getIdentifier() + " has a Connection
to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
                 }
+
+                port.verifyCanStart();
             }
         } finally {
             readLock.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index d8c055b..92931f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,7 +94,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
     public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup
processGroup, final RemoteProcessGroup remoteGroup,
             final TransferDirection direction, final ConnectableType type, final SSLContext
sslContext, final ProcessScheduler scheduler,
-            final NiFiProperties nifiProperties) {
+        final NiFiProperties nifiProperties) {
         // remote group port id needs to be unique but cannot just be the id of the port
         // in the remote group instance. this supports referencing the same remote
         // instance more than once.
@@ -167,6 +168,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
                 .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
                 .transportProtocol(remoteGroup.getTransportProtocol())
                 .httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(),
remoteGroup.getProxyUser(), remoteGroup.getProxyPassword()))
+                .localAddress(remoteGroup.getLocalAddress())
                 .build();
         clientRef.set(client);
     }
@@ -407,8 +409,19 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
     @Override
     public boolean isValid() {
-        return targetExists.get()
-                && (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT ?
!getConnections(Relationship.ANONYMOUS).isEmpty() : true);
+        if (!targetExists.get()) {
+            return false;
+        }
+
+        if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty())
{
+            // if it's an output port, ensure that there is an outbound connection
+            return false;
+        }
+
+        final boolean groupValid = remoteGroup.validate().stream()
+            .allMatch(result -> result.isValid());
+
+        return groupValid;
     }
 
     @Override
@@ -444,6 +457,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty())
{
             throw new IllegalStateException("Port " + getName() + " has no incoming connections");
         }
+
+        final Optional<ValidationResult> resultOption = remoteGroup.validate().stream()
+            .filter(result -> !result.isValid())
+            .findFirst();
+
+        if (resultOption.isPresent()) {
+            throw new IllegalStateException("Remote Process Group is not valid: " + resultOption.get().toString());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 2d48515..31cd154 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -110,6 +110,7 @@ public class TestStandardRemoteGroupPort {
                 connectableType = null;
                 break;
         }
+
         port = spy(new StandardRemoteGroupPort(ID, NAME,
                 processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null,
null)));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 8049c12..113d491 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,32 @@
  */
 package org.apache.nifi.web.api.dto;
 
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.component.details.ComponentDetails;
 import org.apache.nifi.action.component.details.ExtensionDetails;
@@ -157,32 +183,6 @@ import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.revision.RevisionManager;
 
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 public final class DtoFactory {
 
     @SuppressWarnings("rawtypes")
@@ -1550,7 +1550,19 @@ public final class DtoFactory {
         }
 
         if (group.getAuthorizationIssue() != null) {
-            dto.setAuthorizationIssues(Arrays.asList(group.getAuthorizationIssue()));
+            final List<String> authIssues = new ArrayList<>();
+            final String authIssue = group.getAuthorizationIssue();
+            if (authIssue != null) {
+                authIssues.add(authIssue);
+            }
+
+            final Collection<ValidationResult> validationResults = group.validate();
+            validationResults.stream()
+                .filter(result -> !result.isValid())
+                .map(result -> result.toString())
+                .forEach(str -> authIssues.add(str));
+
+            dto.setAuthorizationIssues(authIssues);
         }
 
         dto.setActiveRemoteInputPortCount(activeRemoteInputPortCount);


Mime
View raw message