Repository: nifi
Updated Branches:
refs/heads/master a9a9b6743 -> da6ad4f3b
NIFI-3670 - Expose the control of ListenSyslog's CLIENT_AUTH property to DFM
This closes #1720.
Signed-off-by: Bryan Bende <bbende@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da6ad4f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da6ad4f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da6ad4f3
Branch: refs/heads/master
Commit: da6ad4f3bcdeb43783aafa9f8942c5fa2a7da20b
Parents: a9a9b67
Author: Andre F de Miranda <trixpan@users.noreply.github.com>
Authored: Mon May 1 22:32:51 2017 +1000
Committer: Bryan Bende <bbende@apache.org>
Committed: Mon May 1 10:41:12 2017 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ListenSyslog.java | 113 ++++++++++++-------
1 file changed, 72 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/da6ad4f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 54d516f..ac874d5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -73,6 +73,7 @@ import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.processors.standard.syslog.SyslogEvent;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
+import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
@SupportsBatching
@@ -103,44 +104,49 @@ import org.apache.nifi.ssl.SSLContextService;
public class ListenSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
- .name("Max Size of Message Queue")
- .description("The maximum size of the internal queue used to buffer messages
being transferred from the underlying channel to the processor. " +
+ .name("Max Size of Message Queue")
+ .displayName("Max Size of Message Queue")
+ .description("The maximum size of the internal queue used to buffer messages being
transferred from the underlying channel to the processor. " +
"Setting this value higher allows more messages to be buffered in memory
during surges of incoming messages, but increases the total " +
"memory used by the processor.")
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("10000")
- .required(true)
- .build();
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10000")
+ .required(true)
+ .build();
public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
- .name("Receive Buffer Size")
- .description("The size of each buffer used to receive Syslog messages. Adjust
this value appropriately based on the expected size of the " +
+ .name("Receive Buffer Size")
+ .displayName("Receive Buffer Size")
+ .description("The size of each buffer used to receive Syslog messages. Adjust this
value appropriately based on the expected size of the " +
"incoming Syslog messages. When UDP is selected each buffer will hold
one Syslog message. When TCP is selected messages are read " +
"from an incoming connection until the buffer is full, or the connection
is closed. ")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .defaultValue("65507 B")
- .required(true)
- .build();
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("65507 B")
+ .required(true)
+ .build();
public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
- .name("Max Size of Socket Buffer")
- .description("The maximum size of the socket buffer that should be used. This
is a suggestion to the Operating System " +
+ .name("Max Size of Socket Buffer")
+ .displayName("Max Size of Socket Buffer")
+ .description("The maximum size of the socket buffer that should be used. This is
a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set
too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .defaultValue("1 MB")
- .required(true)
- .build();
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
- .name("Max Number of TCP Connections")
- .description("The maximum number of concurrent connections to accept Syslog messages
in TCP mode.")
- .addValidator(StandardValidators.createLongValidator(1, 65535, true))
- .defaultValue("2")
- .required(true)
- .build();
+ .name("Max Number of TCP Connections")
+ .displayName("Max Number of TCP Connections")
+ .description("The maximum number of concurrent connections to accept Syslog messages
in TCP mode.")
+ .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+ .defaultValue("2")
+ .required(true)
+ .build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
+ .displayName("Max Batch Size")
.description(
"The maximum number of Syslog events to add to a single FlowFile. If multiple
events are available, they will be concatenated along with "
- + "the <Message Delimiter> up to this configured maximum number
of messages")
+ + "the <Message Delimiter> up to this configured maximum number of
messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1")
@@ -148,6 +154,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
+ .displayName("Message Delimiter")
.description("Specifies the delimiter to place between Syslog messages when multiple
messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
@@ -155,28 +162,38 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor PARSE_MESSAGES = new PropertyDescriptor.Builder()
.name("Parse Messages")
+ .displayName("Parse Messages")
.description("Indicates if the processor should parse the Syslog messages. If set
to false, each outgoing FlowFile will only " +
- "contain the sender, protocol, and port, and no additional attributes.")
+ "contain the sender, protocol, and port, and no additional attributes.")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("The Controller Service to use in order to obtain an SSL Context.
If this property is set, syslog " +
+ .name("SSL Context Service")
+ .displayName("SSL Context Service")
+ .description("The Controller Service to use in order to obtain an SSL Context. If
this property is set, syslog " +
"messages will be received over a secure connection.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+ .name("Client Auth")
+ .displayName("Client Auth")
+ .description("The client authentication policy to use for the SSL Context. Only used
if an SSL Context Service is provided.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.values())
+ .defaultValue(SSLContextService.ClientAuth.REQUIRED.name())
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Syslog messages that match one of the expected formats will be
sent out this relationship as a FlowFile per message.")
- .build();
+ .name("success")
+ .description("Syslog messages that match one of the expected formats will be sent
out this relationship as a FlowFile per message.")
+ .build();
public static final Relationship REL_INVALID = new Relationship.Builder()
- .name("invalid")
- .description("Syslog messages that do not match one of the expected formats will
be sent out this relationship as a FlowFile per message.")
- .build();
+ .name("invalid")
+ .description("Syslog messages that do not match one of the expected formats will
be sent out this relationship as a FlowFile per message.")
+ .build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@@ -195,6 +212,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(PORT);
descriptors.add(NETWORK_INTF_NAME);
descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(CLIENT_AUTH);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
@@ -252,6 +270,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.valid(false).subject("SSL Context").build());
}
+ // Validate CLIENT_AUTH
+ final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
+ if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
+ results.add(new ValidationResult.Builder()
+ .explanation("Client Auth must be provided when using TLS/SSL")
+ .valid(false).subject("Client Auth").build());
+ }
+
+
return results;
}
@@ -290,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// create either a UDP or TCP reader and call open() to bind to the given port
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- channelDispatcher = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections,
sslContextService, Charset.forName(charSet));
+ channelDispatcher = createChannelReader(context, protocol, bufferPool, syslogEvents,
maxConnections, sslContextService, Charset.forName(charSet));
channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelDispatcher);
@@ -305,7 +332,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// visible for testing to be overridden and provide a mock ChannelDispatcher if desired
- protected ChannelDispatcher createChannelReader(final String protocol, final BlockingQueue<ByteBuffer>
bufferPool,
+ protected ChannelDispatcher createChannelReader(final ProcessContext context, final String
protocol, final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<RawSyslogEvent>
events, final int maxConnections,
final SSLContextService sslContextService,
final Charset charset) throws IOException {
@@ -316,12 +343,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
} else {
// if an SSLContextService was provided then create an SSLContext to pass down
to the dispatcher
SSLContext sslContext = null;
+ SslContextFactory.ClientAuth clientAuth = null;
+
if (sslContextService != null) {
- sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+ final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+ sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuthValue));
+ clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue);
}
final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher>
handlerFactory = new SocketChannelHandlerFactory<>();
- return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool,
events, getLogger(), maxConnections, sslContext, charset);
+ return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool,
events, getLogger(), maxConnections, sslContext, clientAuth, charset);
}
}
|