nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1899) Create ListenSMTP & ExtractEmailAttachment processors
Date Fri, 01 Jul 2016 02:32:11 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358261#comment-15358261
] 

ASF GitHub Bot commented on NIFI-1899:
--------------------------------------

Github user trixpan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/483#discussion_r69240291
  
    --- Diff: nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
---
    @@ -0,0 +1,383 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one or more
    + *  contributor license agreements.  See the NOTICE file distributed with
    + *  this work for additional information regarding copyright ownership.
    + *  The ASF licenses this file to You under the Apache License, Version 2.0
    + *  (the "License"); you may not use this file except in compliance with
    + *  the License.  You may obtain a copy of the License at
    + *
    + *  http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    +*/
    +package org.apache.nifi.processors.email;
    +
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.SSLSocket;
    +import javax.net.ssl.SSLSocketFactory;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
    +import org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory;
    +import org.subethamail.smtp.server.SMTPServer;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +@Tags({"listen", "email", "smtp"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary
port, " +
    +        "allowing nifi to lister for incoming email. " +
    +        "" +
    +        "Note this server does not perform any email validation. If direct exposure to
the internet is sought," +
    +        "it may be a better idea to use the combination of NiFi and an industrial scale
MTA (e.g. Postfix)")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "mime.type", description = "The value used during
HELO"),
    +        @WritesAttribute(attribute = "smtp.helo", description = "The value used during
HELO"),
    +        @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The
serial numbers for each of the " +
    +                "certificates used by an TLS peer"),
    +        @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The
principal for each of the " +
    +                "certificates used by an TLS peer"),
    +        @WritesAttribute(attribute = "smtp.from", description = "The value used during
MAIL FROM (i.e. envelope)"),
    +        @WritesAttribute(attribute = "smtp.to", description = "The value used during
RCPT TO (i.e. envelope)")})
    +
    +public class ListenSMTP extends AbstractProcessor {
    +    public static final String SMTP_HELO = "smtp.helo";
    +    public static final String SMTP_FROM = "smtp.from";
    +    public static final String SMTP_TO = "smtp.to";
    +    public static final String MIME_TYPE = "message/rfc822";
    +
    +
    +    protected static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
    +            .name("SMTP_PORT")
    +            .displayName("Listening Port")
    +            .description("The TCP port the ListenSMTP processor will bind to." +
    +                    "NOTE that on Unix derivative operating  systems this port must "
+
    +                    "be higher than 1024 unless NiFi is running as with root user permissions.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("SMTP_HOSTNAME")
    +            .displayName("SMTP hostname")
    +            .description("The hostname to be embedded into the banner displayed when
an " +
    +                    "SMTP client connects to the processor TCP port .")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("SMTP_MAXIMUM_CONNECTIONS")
    +            .displayName("Maximum number of SMTP connection")
    +            .description("The maximum number of simultaneous SMTP connections.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("SMTP_TIMEOUT")
    +            .displayName("SMTP connection timeout")
    +            .description("The maximum time to wait for an action of SMTP client.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder()
    +            .name("SMTP_MAXIMUM_MSG_SIZE")
    +            .displayName("SMTP Maximum Message Size")
    +            .description("The maximum number of bytes the server will accept.")
    +            .required(true)
    +            .defaultValue("20MB")
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("SSL_CONTEXT_SERVICE")
    +            .displayName("SSL Context Service")
    +            .description("The Controller Service to use in order to obtain an SSL Context.
If this property is set, " +
    +                    "messages will be received over a secure connection.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
    +            .name("CLIENT_AUTH")
    +            .displayName("Client Auth")
    +            .description("The client authentication policy to use for the SSL Context.
Only used if an SSL Context Service is provided.")
    +            .required(false)
    +            .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString())
    +            .build();
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext
validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>();
    +
    +        final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
    +        final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +
    +        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
    +            results.add(new ValidationResult.Builder()
    +                    .explanation("Client Auth must be provided when using TLS/SSL")
    +                    .valid(false).subject("Client Auth").build());
    +        }
    +
    +        return results;
    +
    +    }
    +
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Extraction was successful")
    +            .build();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private volatile LinkedBlockingQueue<SmtpEvent> messages;
    +
    +    private volatile SMTPServer server;
    +    private AtomicBoolean initialized = new AtomicBoolean(false);
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(SMTP_PORT);
    +        props.add(SMTP_HOSTNAME);
    +        props.add(SMTP_MAXIMUM_CONNECTIONS);
    +        props.add(SMTP_TIMEOUT);
    +        props.add(SMTP_MAXIMUM_MSG_SIZE);
    +        props.add(SSL_CONTEXT_SERVICE);
    +        props.add(CLIENT_AUTH);
    +        this.propertyDescriptors = Collections.unmodifiableList(props);
    +
    +    }
    +
    +    final ComponentLog logger = getLogger();
    +
    +    // Upon Schedule, reset the initialized state to false
    +    @OnScheduled
    +    public void onScheduled() {
    +        initialized.set(false);
    +    }
    +
    +    private synchronized void initializeSMTPServer(final ProcessContext context) throws
Exception {
    +        if (initialized.get()) {
    +            return;
    +        }
    +
    +        messages = new LinkedBlockingQueue<>(1024);
    +        String clientAuth = null;
    +
    +        new SMTPMessageHandlerFactory(messages);
    +
    +        // If an SSLContextService was provided then create an SSLContext to pass down
to the server
    +        SSLContext sslContext = null;
    +        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +        if (sslContextService != null) {
    +            clientAuth = context.getProperty(CLIENT_AUTH).getValue();
    +            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
    +        }
    +
    +        final SSLContext finalSslContext = sslContext;
    +        final String finalClientAuth = clientAuth;
    +
    +        SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(messages);
    +        final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) {
    +
    +            @Override
    +            public SSLSocket createSSLSocket(Socket socket) throws IOException {
    +                InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
    +
    +                SSLSocketFactory socketFactory = finalSslContext.getSocketFactory();
    +
    +                SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),
socket.getPort(), true));
    +
    +                s.setUseClientMode(false);
    +
    +
    +                // For some reason the createSSLContext above is not enough to enforce
    +                // client side auth
    +                // If client auth is required...
    +                if (finalClientAuth == "REQUIRED") {
    --- End diff --
    
    addressed


> Create ListenSMTP & ExtractEmailAttachment processors
> -----------------------------------------------------
>
>                 Key: NIFI-1899
>                 URL: https://issues.apache.org/jira/browse/NIFI-1899
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Andre
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message