Repository: nifi
Updated Branches:
refs/heads/master 000414e7e -> 16bde02ed
NIFI-3541: Add local network interface capability to site-to-site client and remote group
and ports
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9e68f02f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9e68f02f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9e68f02f
Branch: refs/heads/master
Commit: 9e68f02f1fb7eb442f6c9580b46255f713d8b191
Parents: 000414e
Author: Mark Payne <markap14@hotmail.com>
Authored: Wed Mar 1 13:30:04 2017 -0500
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Mar 6 10:36:30 2017 -0500
----------------------------------------------------------------------
.../nifi/remote/client/SiteToSiteClient.java | 40 +++++-
.../remote/client/SiteToSiteClientConfig.java | 7 ++
.../remote/util/SiteToSiteRestApiClient.java | 109 ++++++++++-------
.../apache/nifi/controller/AbstractPort.java | 32 ++---
.../apache/nifi/groups/RemoteProcessGroup.java | 24 ++++
.../org/apache/nifi/remote/RemoteGroupPort.java | 1 +
.../nifi/remote/StandardRemoteProcessGroup.java | 122 ++++++++++++++-----
.../nifi/remote/StandardRemoteGroupPort.java | 27 +++-
.../remote/TestStandardRemoteGroupPort.java | 1 +
.../org/apache/nifi/web/api/dto/DtoFactory.java | 66 ++++++----
10 files changed, 303 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 3d7bacc..daff70d 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -39,6 +39,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
+import java.net.InetAddress;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.LinkedHashSet;
@@ -168,6 +169,7 @@ public interface SiteToSiteClient extends Closeable {
private int batchCount;
private long batchSize;
private long batchNanos;
+ private InetAddress localAddress;
private SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW;
private HttpProxy httpProxy;
@@ -198,6 +200,7 @@ public interface SiteToSiteClient extends Closeable {
this.batchCount = config.getPreferredBatchCount();
this.batchSize = config.getPreferredBatchSize();
this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
+ this.localAddress = config.getLocalAddress();
this.httpProxy = config.getHttpProxy();
return this;
@@ -223,12 +226,31 @@ public interface SiteToSiteClient extends Closeable {
}
/**
- * <p>Specifies the URLs of the remote NiFi instance.</p>
- * <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to
and from
- * nodes will be automatically load balanced across the different nodes.</p>
+ * <p>
+ * Specifies the local address to use when communicating with the remote NiFi instance.
+ * </p>
+ *
+ * @param localAddress the local address to use, or <code>null</code>
to use <code>anyLocal</code> address.
+ * @return the builder
+ */
+ public Builder localAddress(final InetAddress localAddress) {
+ this.localAddress = localAddress;
+ return this;
+ }
+
+ /**
+ * <p>
+ * Specifies the URLs of the remote NiFi instance.
+ * </p>
+ * <p>
+ * If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
+ * nodes will be automatically load balanced across the different nodes.
+ * </p>
*
- * <p>Multiple urls provide better connectivity with a NiFi cluster, able to
connect
- * to the target cluster at long as one of the specified urls is accessible.</p>
+ * <p>
+ * Multiple urls provide better connectivity with a NiFi cluster, able to connect
+ * to the target cluster at long as one of the specified urls is accessible.
+ * </p>
*
* @param urls urls of remote instance
* @return the builder
@@ -717,6 +739,7 @@ public interface SiteToSiteClient extends Closeable {
private final long batchSize;
private final long batchNanos;
private final HttpProxy httpProxy;
+ private final InetAddress localAddress;
// some serialization frameworks require a default constructor
private StandardSiteToSiteClientConfig() {
@@ -740,6 +763,7 @@ public interface SiteToSiteClient extends Closeable {
this.batchNanos = 0;
this.transportProtocol = null;
this.httpProxy = null;
+ this.localAddress = null;
}
private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
@@ -766,6 +790,7 @@ public interface SiteToSiteClient extends Closeable {
this.batchNanos = builder.batchNanos;
this.transportProtocol = builder.getTransportProtocol();
this.httpProxy = builder.getHttpProxy();
+ this.localAddress = builder.localAddress;
}
@Override
@@ -931,5 +956,10 @@ public interface SiteToSiteClient extends Closeable {
public HttpProxy getHttpProxy() {
return httpProxy;
}
+
+ @Override
+ public InetAddress getLocalAddress() {
+ return localAddress;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 5bdeee4..83e8328 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
import java.io.File;
import java.io.Serializable;
+import java.net.InetAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ public interface SiteToSiteClientConfig extends Serializable {
* for backward compatibility for implementations that does not expect multiple URLs.
* {@link #getUrls()} should be used instead then should support multiple URLs when making
requests.
*/
+ @Deprecated
String getUrl();
/**
@@ -171,4 +173,9 @@ public interface SiteToSiteClientConfig extends Serializable {
*/
HttpProxy getHttpProxy();
+ /**
+ * @return the InetAddress to bind to for the local address when creating a socket, or
+ * {@code null} to bind to the {@code anyLocal} address.
+ */
+ InetAddress getLocalAddress();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 89da6a0..e6777b0 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -16,6 +16,55 @@
*/
package org.apache.nifi.remote.util;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
@@ -87,53 +136,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
-import java.util.regex.Pattern;
-
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
-
public class SiteToSiteRestApiClient implements Closeable {
private static final String EVENT_CATEGORY = "Site-to-Site";
@@ -160,6 +162,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private CloseableHttpAsyncClient httpAsyncClient;
private boolean compress = false;
+ private InetAddress localAddress = null;
private long requestExpirationMillis = 0;
private int serverTransactionTtl = 0;
private int batchCount = 0;
@@ -239,6 +242,10 @@ public class SiteToSiteRestApiClient implements Closeable {
.setConnectTimeout(connectTimeoutMillis)
.setSocketTimeout(readTimeoutMillis);
+ if (localAddress != null) {
+ requestConfigBuilder.setLocalAddress(localAddress);
+ }
+
if (proxy != null) {
requestConfigBuilder.setProxy(proxy.getHttpHost());
}
@@ -916,6 +923,8 @@ public class SiteToSiteRestApiClient implements Closeable {
extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
+ extendingApiClient.localAddress = this.localAddress;
+
final int extendFrequency = serverTransactionTtl / 2;
ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
@@ -1197,10 +1206,12 @@ public class SiteToSiteRestApiClient implements Closeable {
public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
+ setupRequestConfig();
}
public void setReadTimeoutMillis(final int readTimeoutMillis) {
this.readTimeoutMillis = readTimeoutMillis;
+ setupRequestConfig();
}
public static String getFirstUrl(final String clusterUrlStr) {
@@ -1336,6 +1347,10 @@ public class SiteToSiteRestApiClient implements Closeable {
public void setCompress(final boolean compress) {
this.compress = compress;
}
+
+ public void setLocalAddress(final InetAddress localAddress) {
+ this.localAddress = localAddress;
+ }
public void setRequestExpirationMillis(final long requestExpirationMillis) {
if (requestExpirationMillis < 0) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index 1177dad..4d061b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -16,6 +16,22 @@
*/
package org.apache.nifi.controller;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.authorization.Resource;
@@ -36,22 +52,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
-
public abstract class AbstractPort implements Port {
public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 64e2ca0..cb1e6c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -17,18 +17,22 @@
package org.apache.nifi.groups;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import java.net.InetAddress;
+import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable {
+ @Override
String getIdentifier();
String getTargetUri();
@@ -155,6 +159,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
String getAuthorizationIssue();
/**
+ * Validates the current configuration, returning ValidationResults for any
+ * invalid configuration parameter.
+ *
+ * @return Collection of validation result objects for any invalid findings
+ * only. If the collection is empty then the component is valid. Guaranteed
+ * non-null
+ */
+ Collection<ValidationResult> validate();
+
+ /**
* @return the {@link EventReporter} that can be used to report any notable
* events
*/
@@ -180,6 +194,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
void setProxyPassword(String proxyPassword);
+ void setNetworkInterface(String interfaceName);
+
+ String getNetworkInterface();
+
+ /**
+ * Returns the InetAddress that the will this instance will bind to when communicating
with a
+ * remote NiFi instance, or <code>null</code> if no specific address has
been specified
+ */
+ InetAddress getLocalAddress();
+
/**
* Initiates a task in the remote process group to re-initialize, as a
* result of clustering changes
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
index 8cad103..f8f4b20 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -33,6 +33,7 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port,
Remo
public abstract TransferDirection getTransferDirection();
+ @Override
public abstract boolean isUseCompression();
public abstract void setUseCompression(boolean useCompression);
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 855dab7..67c8f11 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,14 +18,15 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.UniformInterfaceException;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -40,12 +41,15 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
+
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
@@ -74,6 +78,11 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.UniformInterfaceException;
+
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
* information about that remote instance, as well as {@link IncomingPort}s and
@@ -99,7 +108,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final AtomicReference<String> comments = new AtomicReference<>();
private final AtomicReference<ProcessGroup> processGroup;
private final AtomicBoolean transmitting = new AtomicBoolean(false);
- private final FlowController flowController;
private final SSLContext sslContext;
private volatile String communicationsTimeout = "30 sec";
@@ -111,6 +119,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
private volatile String proxyUser;
private volatile String proxyPassword;
+ private String networkInterfaceName;
+ private InetAddress localAddress;
+ private ValidationResult nicValidationResult;
+
+
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
@@ -135,7 +148,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
final FlowController flowController, final SSLContext
sslContext, final NiFiProperties nifiProperties) {
this.nifiProperties = nifiProperties;
this.id = requireNonNull(id);
- this.flowController = requireNonNull(flowController);
this.targetUris = targetUris;
this.targetId = null;
@@ -354,6 +366,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
return authorizationIssue;
}
+ @Override
+ public Collection<ValidationResult> validate() {
+ return (nicValidationResult == null) ? Collections.emptyList() : Collections.singletonList(nicValidationResult);
+ }
+
public int getInputPortCount() {
readLock.lock();
try {
@@ -606,7 +623,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
}
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(),
descriptor.getName(), getProcessGroup(),
- this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT,
sslContext, scheduler, nifiProperties);
+ this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext,
scheduler, nifiProperties);
outputPorts.put(descriptor.getId(), port);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
@@ -672,7 +689,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
}
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(),
descriptor.getName(), getProcessGroup(), this,
- TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext,
scheduler, nifiProperties);
+ TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler,
nifiProperties);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount());
@@ -741,15 +758,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
}
}
- private ProcessGroup getRootGroup() {
- return getRootGroup(getProcessGroup());
- }
-
- private ProcessGroup getRootGroup(final ProcessGroup context) {
- final ProcessGroup parent = context.getParent();
- return parent == null ? context : getRootGroup(parent);
- }
-
@Override
public Date getLastRefreshTime() {
readLock.lock();
@@ -856,10 +864,75 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
}
}
+ @Override
+ public String getNetworkInterface() {
+ readLock.lock();
+ try {
+ return networkInterfaceName;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void setNetworkInterface(final String interfaceName) {
+ writeLock.lock();
+ try {
+ this.networkInterfaceName = interfaceName;
+
+ try {
+ final Enumeration<InetAddress> inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses();
+
+ if (inetAddresses.hasMoreElements()) {
+ this.localAddress = inetAddresses.nextElement();
+ this.nicValidationResult = null;
+ } else {
+ this.localAddress = null;
+ this.nicValidationResult = new ValidationResult.Builder()
+ .input(interfaceName)
+ .subject("Network Interface Name")
+ .valid(false)
+ .explanation("No IP Address could be found that is bound to the interface
with name " + interfaceName)
+ .build();
+ }
+ } catch (final Exception e) {
+ this.localAddress = null;
+ this.nicValidationResult = new ValidationResult.Builder()
+ .input(interfaceName)
+ .subject("Network Interface Name")
+ .valid(false)
+ .explanation("Could not obtain Network Interface with name " + interfaceName)
+ .build();
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public InetAddress getLocalAddress() {
+ readLock.lock();
+ try {
+ if (nicValidationResult != null && !nicValidationResult.isValid()) {
+ return null;
+ }
+
+ return localAddress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
private SiteToSiteRestApiClient getSiteToSiteRestApiClient() {
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost,
proxyPort, proxyUser, proxyPassword), getEventReporter());
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+
+ final InetAddress localAddress = getLocalAddress();
+ if (localAddress != null) {
+ apiClient.setLocalAddress(localAddress);
+ }
+
return apiClient;
}
@@ -886,17 +959,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
return remotePorts;
}
- private RemoteProcessGroupPortDescriptor convertPortToRemotePortDescriptor(final Port
port) {
- final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
- descriptor.setComments(port.getComments());
- descriptor.setExists(true);
- descriptor.setGroupId(port.getProcessGroup().getIdentifier());
- descriptor.setId(port.getIdentifier());
- descriptor.setName(port.getName());
- descriptor.setTargetRunning(port.isRunning());
- return descriptor;
- }
-
@Override
public boolean isTransmitting() {
return transmitting.get();
@@ -1216,6 +1278,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
if (port.hasIncomingConnection() && !port.getTargetExists()) {
throw new IllegalStateException(this.getIdentifier() + " has a Connection
to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
}
+
+ port.verifyCanStart();
}
for (final StandardRemoteGroupPort port : outputPorts.values()) {
@@ -1226,6 +1290,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup
{
if (!port.getConnections().isEmpty() && !port.getTargetExists())
{
throw new IllegalStateException(this.getIdentifier() + " has a Connection
to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
}
+
+ port.verifyCanStart();
}
} finally {
readLock.unlock();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index d8c055b..92931f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,7 +94,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup
processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext
sslContext, final ProcessScheduler scheduler,
- final NiFiProperties nifiProperties) {
+ final NiFiProperties nifiProperties) {
// remote group port id needs to be unique but cannot just be the id of the port
// in the remote group instance. this supports referencing the same remote
// instance more than once.
@@ -167,6 +168,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(remoteGroup.getTransportProtocol())
.httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(),
remoteGroup.getProxyUser(), remoteGroup.getProxyPassword()))
+ .localAddress(remoteGroup.getLocalAddress())
.build();
clientRef.set(client);
}
@@ -407,8 +409,19 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public boolean isValid() {
- return targetExists.get()
- && (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT ?
!getConnections(Relationship.ANONYMOUS).isEmpty() : true);
+ if (!targetExists.get()) {
+ return false;
+ }
+
+ if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty())
{
+ // if it's an output port, ensure that there is an outbound connection
+ return false;
+ }
+
+ final boolean groupValid = remoteGroup.validate().stream()
+ .allMatch(result -> result.isValid());
+
+ return groupValid;
}
@Override
@@ -444,6 +457,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty())
{
throw new IllegalStateException("Port " + getName() + " has no incoming connections");
}
+
+ final Optional<ValidationResult> resultOption = remoteGroup.validate().stream()
+ .filter(result -> !result.isValid())
+ .findFirst();
+
+ if (resultOption.isPresent()) {
+ throw new IllegalStateException("Remote Process Group is not valid: " + resultOption.get().toString());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 2d48515..31cd154 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -110,6 +110,7 @@ public class TestStandardRemoteGroupPort {
connectableType = null;
break;
}
+
port = spy(new StandardRemoteGroupPort(ID, NAME,
processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null,
null)));
http://git-wip-us.apache.org/repos/asf/nifi/blob/9e68f02f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 8049c12..113d491 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,32 @@
*/
package org.apache.nifi.web.api.dto;
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
import org.apache.nifi.action.Action;
import org.apache.nifi.action.component.details.ComponentDetails;
import org.apache.nifi.action.component.details.ExtensionDetails;
@@ -157,32 +183,6 @@ import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
public final class DtoFactory {
@SuppressWarnings("rawtypes")
@@ -1550,7 +1550,19 @@ public final class DtoFactory {
}
if (group.getAuthorizationIssue() != null) {
- dto.setAuthorizationIssues(Arrays.asList(group.getAuthorizationIssue()));
+ final List<String> authIssues = new ArrayList<>();
+ final String authIssue = group.getAuthorizationIssue();
+ if (authIssue != null) {
+ authIssues.add(authIssue);
+ }
+
+ final Collection<ValidationResult> validationResults = group.validate();
+ validationResults.stream()
+ .filter(result -> !result.isValid())
+ .map(result -> result.toString())
+ .forEach(str -> authIssues.add(str));
+
+ dto.setAuthorizationIssues(authIssues);
}
dto.setActiveRemoteInputPortCount(activeRemoteInputPortCount);
|