nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jperciv...@apache.org
Subject [2/3] nifi git commit: NIFI-1899 - Introduce ExtractEmailAttachments and ExtractEmailHeaders processors - Introduce ListenSMTP (allows NiFi to receive data via email) - Addresses @ijokarumawak and @@JPercivall PR comments
Date Thu, 21 Jul 2016 14:49:52 GMT
NIFI-1899 - Introduce ExtractEmailAttachments and ExtractEmailHeaders processors - Introduce ListenSMTP (allows NiFi to receive data via email) - Addresses @ijokarumawak and @@JPercivall PR comments

Signed-off-by: jpercivall <joepercivall@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4f672832
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4f672832
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4f672832

Branch: refs/heads/master
Commit: 4f672832c0efb95e324e8f48c995be6d50a1b67c
Parents: f352ea1
Author: Andre F de Miranda <trixpan@users.noreply.github.com>
Authored: Sat Jun 18 22:23:55 2016 +1000
Committer: jpercivall <joepercivall@yahoo.com>
Committed: Thu Jul 21 10:26:30 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/NOTICE                            |   9 +
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-email-bundle/nifi-email-nar/pom.xml    |  44 ++
 .../nifi-email-processors/pom.xml               |  73 +++
 .../email/ExtractEmailAttachments.java          | 212 +++++++++
 .../processors/email/ExtractEmailHeaders.java   | 235 ++++++++++
 .../nifi/processors/email/ListenSMTP.java       | 446 +++++++++++++++++++
 .../processors/email/smtp/event/SmtpEvent.java  | 134 ++++++
 .../smtp/handler/SMTPMessageHandlerFactory.java | 182 ++++++++
 .../email/smtp/handler/SMTPResultCode.java      |  83 ++++
 .../org.apache.nifi.processor.Processor         |  17 +
 .../processors/email/GenerateAttachment.java    | 110 +++++
 .../email/TestExtractEmailAttachments.java      | 113 +++++
 .../email/TestExtractEmailHeaders.java          |  93 ++++
 .../nifi/processors/email/TestListenSMTP.java   | 302 +++++++++++++
 .../src/test/resources/localhost-ks.jks         | Bin 0 -> 3512 bytes
 .../src/test/resources/localhost-ts.jks         | Bin 0 -> 1816 bytes
 nifi-nar-bundles/nifi-email-bundle/pom.xml      |  42 ++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |   6 +
 20 files changed, 2107 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 7ccfc82..e6c9f15 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -829,6 +829,15 @@ The following binary components are provided under the Apache Software License v
        The code for the t-digest was originally authored by Ted Dunning
        A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz)
 
+    (ASLv2) subethasmtp
+       The following NOTICE information applies:
+
+       Copyright (C) 2006-2007 SubEthaMail.org
+
+    (ASLv2) Apache Commons Email
+       The following NOTICE information applies:
+       Apache Commons Email
+       Copyright 2002-2015 The Apache Software Foundation
 
 This includes derived works from the Apache Software License V2 library python-evtx (https://github.com/williballenthin/python-evtx)
 Copyright 2012, 2013 Willi Ballenthin william.ballenthin@mandiant.com

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 1564313..fb6bf87 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -298,6 +298,11 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-email-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-amqp-nar</artifactId>
 	        <type>nar</type>
 	    </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml
new file mode 100644
index 0000000..e5fe4fc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+       Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-email-bundle</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-email-nar</artifactId>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-email-processors</artifactId>
+        </dependency>
+    </dependencies>
+</project>
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
new file mode 100644
index 0000000..655ac86
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+     license agreements. See the NOTICE file distributed with this work for additional
+     information regarding copyright ownership. The ASF licenses this file to
+     You under the Apache License, Version 2.0 (the "License"); you may not use
+     this file except in compliance with the License. You may obtain a copy of
+     the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+     by applicable law or agreed to in writing, software distributed under the
+     License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+     OF ANY KIND, either express or implied. See the License for the specific
+     language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+        <modelVersion>4.0.0</modelVersion>
+
+        <parent>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-email-bundle</artifactId>
+                <version>1.0.0-SNAPSHOT</version>
+        </parent>
+
+        <artifactId>nifi-email-processors</artifactId>
+        <packaging>jar</packaging>
+
+        <dependencies>
+                <dependency>
+                        <groupId>org.apache.nifi</groupId>
+                        <artifactId>nifi-api</artifactId>
+                </dependency>
+                <dependency>
+                        <groupId>org.apache.nifi</groupId>
+                        <artifactId>nifi-processor-utils</artifactId>
+                </dependency>
+                <dependency>
+                        <groupId>javax.mail</groupId>
+                        <artifactId>mail</artifactId>
+                </dependency>
+                <dependency>
+                        <groupId>org.apache.commons</groupId>
+                        <artifactId>commons-email</artifactId>
+                        <version>1.4</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.subethamail</groupId>
+                    <artifactId>subethasmtp</artifactId>
+                    <version>3.1.7</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-ssl-context-service-api</artifactId>
+                </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-ssl-context-service</artifactId>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-mock</artifactId>
+                <scope>test</scope>
+            </dependency>
+        </dependencies>
+        <build>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.rat</groupId>
+                    <artifactId>apache-rat-plugin</artifactId>
+                    <configuration>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </build>
+</project>
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java
new file mode 100644
index 0000000..18c74e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.email;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Date;
+
+import javax.activation.DataSource;
+import javax.mail.Address;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.mail.util.MimeMessageParser;
+
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileHandlingException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.stream.io.BufferedInputStream;
+
+
+
+@SupportsBatching
+@EventDriven
+@SideEffectFree
+@Tags({"split", "email"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Extract attachments from a mime formatted email file, splitting them into individual flowfiles.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename ", description = "The filename of the attachment"),
+        @WritesAttribute(attribute = "email.attachment.parent.filename ", description = "The filename of the parent FlowFile"),
+        @WritesAttribute(attribute = "email.attachment.parent.uuid", description = "The UUID of the original FlowFile."),
+        @WritesAttribute(attribute = "mime.type", description = "The mime type of the attachment.")})
+
+public class ExtractEmailAttachments extends AbstractProcessor {
+    public static final String ATTACHMENT_ORIGINAL_FILENAME = "email.attachment.parent.filename";
+    public static final String ATTACHMENT_ORIGINAL_UUID = "email.attachment.parent.uuid";
+
+    public static final Relationship REL_ATTACHMENTS = new Relationship.Builder()
+            .name("attachments")
+            .description("Each individual attachment will be routed to the attachments relationship")
+            .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original file")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Flowfiles that could not be parsed")
+            .build();
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ATTACHMENTS);
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final ComponentLog logger = getLogger();
+        final FlowFile originalFlowFile = session.get();
+        if (originalFlowFile == null) {
+            return;
+        }
+        final List<FlowFile> attachmentsList = new ArrayList<>();
+        final List<FlowFile> invalidFlowFilesList = new ArrayList<>();
+        final List<FlowFile> originalFlowFilesList = new ArrayList<>();
+
+        session.read(originalFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream rawIn) throws IOException {
+                    try (final InputStream in = new BufferedInputStream(rawIn)) {
+                        Properties props = new Properties();
+                        Session mailSession = Session.getDefaultInstance(props, null);
+                        MimeMessage originalMessage = new MimeMessage(mailSession, in);
+                        MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
+                        // RFC-2822 determines that a message must have a "From:" header
+                        // if a message lacks the field, it is flagged as invalid
+                        Address[] from = originalMessage.getFrom();
+                        Date sentDate = originalMessage.getSentDate();
+                        if (from == null || sentDate == null) {
+                            // Throws MessageException due to lack of minimum required headers
+                            throw new MessagingException("Message failed RFC2822 validation");
+                        }
+                        originalFlowFilesList.add(originalFlowFile);
+                        if (parser.hasAttachments()) {
+                            final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
+                            try {
+                                for (final DataSource data : parser.getAttachmentList()) {
+                                    FlowFile split = session.create(originalFlowFile);
+                                    final Map<String, String> attributes = new HashMap<>();
+                                    if (StringUtils.isNotBlank(data.getName())) {
+                                        attributes.put(CoreAttributes.FILENAME.key(), data.getName());
+                                    }
+                                    if (StringUtils.isNotBlank(data.getContentType())) {
+                                        attributes.put(CoreAttributes.MIME_TYPE.key(), data.getContentType());
+                                    }
+                                    String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key());
+                                    attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid);
+                                    attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName);
+                                    split = session.append(split, new OutputStreamCallback() {
+                                        @Override
+                                        public void process(OutputStream out) throws IOException {
+                                            IOUtils.copy(data.getInputStream(), out);
+                                        }
+                                    });
+                                    split = session.putAllAttributes(split, attributes);
+                                    attachmentsList.add(split);
+                                }
+                            } catch (FlowFileHandlingException e) {
+                                // Something went wrong
+                                // Removing splits that may have been created
+                                session.remove(attachmentsList);
+                                // Removing the original flow from its list
+                                originalFlowFilesList.remove(originalFlowFile);
+                                logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", new Object[]{originalFlowFile, e});
+                                invalidFlowFilesList.add(originalFlowFile);
+                            }
+                        }
+                    } catch (Exception e) {
+                        // Another error hit...
+                        // Removing the original flow from its list
+                        originalFlowFilesList.remove(originalFlowFile);
+                        logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e});
+                        // Message is invalid or triggered an error during parsing
+                        invalidFlowFilesList.add(originalFlowFile);
+                    }
+                }
+        });
+
+        session.transfer(attachmentsList, REL_ATTACHMENTS);
+
+        // As per above code, originalFlowfile may be routed to invalid or
+        // original depending on RFC2822 compliance.
+        session.transfer(invalidFlowFilesList, REL_FAILURE);
+        session.transfer(originalFlowFilesList, REL_ORIGINAL);
+
+        if (attachmentsList.size() > 10) {
+            logger.info("Split {} into {} files", new Object[]{originalFlowFile, attachmentsList.size()});
+        } else if (attachmentsList.size() > 1){
+            logger.info("Split {} into {} files: {}", new Object[]{originalFlowFile, attachmentsList.size(), attachmentsList});
+        }
+     }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java
new file mode 100644
index 0000000..8aa4507
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.email;
+
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.mail.util.MimeMessageParser;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedInputStream;
+
+import javax.mail.Address;
+import javax.mail.Header;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+@SupportsBatching
+@EventDriven
+@SideEffectFree
+@Tags({"split", "email"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Using the flowfile content as source of data, extract header from an RFC  compliant  email file adding the relevant attributes to the flowfile. " +
+        "This processor does not perform extensive RFC validation but still requires a bare minimum compliance with RFC 2822")
+@WritesAttributes({
+        @WritesAttribute(attribute = "email.headers.bcc.*", description = "Each individual BCC recipient (if available)"),
+        @WritesAttribute(attribute = "email.headers.cc.*", description = "Each individual CC recipient (if available)"),
+        @WritesAttribute(attribute = "email.headers.from.*", description = "Each individual mailbox contained in the From  of the Email (array as per RFC-2822)"),
+        @WritesAttribute(attribute = "email.headers.message-id", description = "The value of the Message-ID header (if available)"),
+        @WritesAttribute(attribute = "email.headers.received_date", description = "The Received-Date of the message (if available)"),
+        @WritesAttribute(attribute = "email.headers.sent_date", description = "Date the message was sent"),
+        @WritesAttribute(attribute = "email.headers.subject", description = "Subject of the message (if available)"),
+        @WritesAttribute(attribute = "email.headers.to.*", description = "Each individual TO recipient (if available)"),
+        @WritesAttribute(attribute = "email.attachment_count", description = "Number of attachments of the message" )})
+
+public class ExtractEmailHeaders extends AbstractProcessor {
+    public static final String EMAIL_HEADER_BCC = "email.headers.bcc";
+    public static final String EMAIL_HEADER_CC = "email.headers.cc";
+    public static final String EMAIL_HEADER_FROM = "email.headers.from";
+    public static final String EMAIL_HEADER_MESSAGE_ID = "email.headers.message-id";
+    public static final String EMAIL_HEADER_RECV_DATE = "email.headers.received_date";
+    public static final String EMAIL_HEADER_SENT_DATE = "email.headers.sent_date";
+    public static final String EMAIL_HEADER_SUBJECT = "email.headers.subject";
+    public static final String EMAIL_HEADER_TO = "email.headers.to";
+    public static final String EMAIL_ATTACHMENT_COUNT = "email.attachment_count";
+
+    public static final PropertyDescriptor CAPTURED_HEADERS = new PropertyDescriptor.Builder()
+            .name("CAPTURED_HEADERS")
+            .displayName("Additional Header List")
+            .description("COLON separated list of additional headers to be extracted from the flowfile content." +
+                    "NOTE the header key is case insensitive and will be matched as lower-case." +
+                    " Values will respect email contents.")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("x-mailer")
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Extraction was successful")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Flowfiles that could not be parsed as a RFC-2822 compliant message")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+        descriptors.add(CAPTURED_HEADERS);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final ComponentLog logger = getLogger();
+
+        final List<FlowFile> invalidFlowFilesList = new ArrayList<>();
+        final List<FlowFile> processedFlowFilesList = new ArrayList<>();
+
+        final FlowFile originalFlowFile = session.get();
+        if (originalFlowFile == null) {
+            return;
+        }
+
+        final List<String> capturedHeadersList = Arrays.asList(context.getProperty(CAPTURED_HEADERS).getValue().toLowerCase().split(":"));
+
+        final Map<String, String> attributes = new HashMap<>();
+        session.read(originalFlowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream rawIn) throws IOException {
+                try (final InputStream in = new BufferedInputStream(rawIn)) {
+                    Properties props = new Properties();
+                    Session mailSession = Session.getDefaultInstance(props, null);
+                    MimeMessage originalMessage = new MimeMessage(mailSession, in);
+                    MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
+                    // RFC-2822 determines that a message must have a "From:" header
+                    // if a message lacks the field, it is flagged as invalid
+                    Address[] from = originalMessage.getFrom();
+                    Date sentDate = originalMessage.getSentDate();
+                    if (from == null || sentDate == null ) {
+                        // Throws MessageException due to lack of minimum required headers
+                        throw new MessagingException("Message failed RFC2822 validation");
+                    } else if (capturedHeadersList.size() > 0){
+                        Enumeration headers = originalMessage.getAllHeaders();
+                        while (headers.hasMoreElements()) {
+                            Header header = (Header) headers.nextElement();
+                            if (StringUtils.isNotEmpty(header.getValue())
+                                    && capturedHeadersList.contains(header.getName().toLowerCase())) {
+                                attributes.put("email.headers." + header.getName().toLowerCase(), header.getValue());
+                            }
+                        }
+                    }
+                    if (Array.getLength(originalMessage.getAllRecipients()) > 0) {
+                        for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.TO)); toCount++) {
+                            attributes.put(EMAIL_HEADER_TO + "." + toCount, originalMessage.getRecipients(Message.RecipientType.TO)[toCount].toString());
+                        }
+                        for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.BCC)); toCount++) {
+                            attributes.put(EMAIL_HEADER_BCC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.BCC)[toCount].toString());
+                        }
+                        for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.CC)); toCount++) {
+                            attributes.put(EMAIL_HEADER_CC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.CC)[toCount].toString());
+                        }
+                    }
+                    // Incredibly enough RFC-2822 specified From as a "mailbox-list" so an array I returned by getFrom
+                    for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getFrom()); toCount++) {
+                        attributes.put(EMAIL_HEADER_FROM + "." + toCount, originalMessage.getFrom()[toCount].toString());
+                    }
+                    if (StringUtils.isNotEmpty(originalMessage.getMessageID())) {
+                        attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID());
+                    }
+                    if (originalMessage.getReceivedDate() != null) {
+                        attributes.put(EMAIL_HEADER_RECV_DATE, originalMessage.getReceivedDate().toString());
+                    }
+                    if (originalMessage.getSentDate() != null) {
+                        attributes.put(EMAIL_HEADER_SENT_DATE, originalMessage.getSentDate().toString());
+                    }
+                    if (StringUtils.isNotEmpty(originalMessage.getSubject())) {
+                        attributes.put(EMAIL_HEADER_SUBJECT, originalMessage.getSubject());
+                    }
+                    // Zeroes EMAIL_ATTACHMENT_COUNT
+                    attributes.put(EMAIL_ATTACHMENT_COUNT, "0");
+                    // But insert correct value if attachments are present
+                    if (parser.hasAttachments()) {
+                        attributes.put(EMAIL_ATTACHMENT_COUNT, String.valueOf(parser.getAttachmentList().size()));
+                    }
+
+                } catch (Exception e) {
+                    // Message is invalid or triggered an error during parsing
+                    attributes.clear();
+                    logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e});
+                    invalidFlowFilesList.add(originalFlowFile);
+                }
+            }
+        });
+
+        if (attributes.size() > 0) {
+            FlowFile updatedFlowFile = session.putAllAttributes(originalFlowFile, attributes);
+            logger.info("Extracted {} into {} files", new Object[]{attributes.size(), updatedFlowFile});
+            processedFlowFilesList.add(updatedFlowFile);
+        }
+
+        session.transfer(processedFlowFilesList, REL_SUCCESS);
+        session.transfer(invalidFlowFilesList, REL_FAILURE);
+
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
new file mode 100644
index 0000000..51b1d2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
@@ -0,0 +1,446 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+*/
+package org.apache.nifi.processors.email;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.subethamail.smtp.server.SMTPServer;
+
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.ssl.SSLContextService;
+
+import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
+import org.apache.nifi.processors.email.smtp.handler.SMTPResultCode;
+import org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory;
+
+@Tags({"listen", "email", "smtp"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " +
+        "allowing nifi to listen for incoming email. " +
+        "" +
+        "Note this server does not perform any email validation. If direct exposure to the internet is sought," +
+        "it may be a better idea to use the combination of NiFi and an industrial scale MTA (e.g. Postfix)")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "The value used during HELO"),
+        @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"),
+        @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " +
+                "certificates used by an TLS peer"),
+        @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " +
+                "certificates used by an TLS peer"),
+        @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"),
+        @WritesAttribute(attribute = "smtp.to", description = "The value used during RCPT TO (i.e. envelope)"),
+        @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection")})
+
+public class ListenSMTP extends AbstractProcessor {
+    public static final String SMTP_HELO = "smtp.helo";
+    public static final String SMTP_FROM = "smtp.from";
+    public static final String SMTP_TO = "smtp.to";
+    public static final String MIME_TYPE = "message/rfc822";
+    public static final String SMTP_SRC_IP = "smtp.src";
+
+
+    protected static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
+            .name("SMTP_PORT")
+            .displayName("Listening Port")
+            .description("The TCP port the ListenSMTP processor will bind to." +
+                    "NOTE that on Unix derivative operating  systems this port must " +
+                    "be higher than 1024 unless NiFi is running as with root user permissions.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
+            .name("SMTP_HOSTNAME")
+            .displayName("SMTP hostname")
+            .description("The hostname to be embedded into the banner displayed when an " +
+                    "SMTP client connects to the processor TCP port .")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder()
+            .name("SMTP_MAXIMUM_CONNECTIONS")
+            .displayName("Maximum number of SMTP connection")
+            .description("The maximum number of simultaneous SMTP connections.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("SMTP_TIMEOUT")
+            .displayName("SMTP connection timeout")
+            .description("The maximum time to wait for an action of SMTP client.")
+            .defaultValue("60 seconds")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder()
+            .name("SMTP_MAXIMUM_MSG_SIZE")
+            .displayName("SMTP Maximum Message Size")
+            .description("The maximum number of bytes the server will accept.")
+            .required(true)
+            .defaultValue("20MB")
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE = new PropertyDescriptor.Builder()
+            .name("SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE")
+            .displayName("SMTP message buffer length")
+            .description("This property control the size of the Queue utilised by the processor to hold messages as they are processed. " +
+                    "Setting a very small value will decrease the number of emails the processor simultaneously, while setting an very large" +
+                    "queue will result in higher memory and CPU utilisation. The default setting of 1024 is generally a fair number.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .defaultValue("1024")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL_CONTEXT_SERVICE")
+            .displayName("SSL Context Service")
+            .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
+                    "messages will be received over a secure connection.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+            .name("CLIENT_AUTH")
+            .displayName("Client Auth")
+            .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
+            .required(false)
+            .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString())
+            .build();
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
+        final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
+            results.add(new ValidationResult.Builder()
+                    .explanation("Client Auth must be provided when using TLS/SSL")
+                    .valid(false).subject("Client Auth").build());
+        }
+
+        return results;
+
+    }
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Extraction was successful")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> propertyDescriptors;
+    private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
+
+    private volatile SMTPServer server;
+    private AtomicBoolean initialized = new AtomicBoolean(false);
+    private AtomicBoolean stopping = new AtomicBoolean(false);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(SMTP_PORT);
+        props.add(SMTP_HOSTNAME);
+        props.add(SMTP_MAXIMUM_CONNECTIONS);
+        props.add(SMTP_TIMEOUT);
+        props.add(SMTP_MAXIMUM_MSG_SIZE);
+        props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE);
+        props.add(SSL_CONTEXT_SERVICE);
+        props.add(CLIENT_AUTH);
+        this.propertyDescriptors = Collections.unmodifiableList(props);
+
+    }
+
+    // Upon Schedule, reset the initialized state to false
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        initialized.set(false);
+        stopping.set(false);
+    }
+
+    protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception {
+
+        // check if we are already running or if it is stopping
+        if (initialized.get() && server.isRunning() || stopping.get() ) {
+            return;
+        }
+
+        incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger());
+
+        String clientAuth = null;
+
+        // If an SSLContextService was provided then create an SSLContext to pass down to the server
+        SSLContext sslContext = null;
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            clientAuth = context.getProperty(CLIENT_AUTH).getValue();
+            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
+        }
+
+        final SSLContext finalSslContext = sslContext;
+
+        SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger());
+        final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) {
+
+            @Override
+            public SSLSocket createSSLSocket(Socket socket) throws IOException {
+                InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
+
+                SSLSocketFactory socketFactory = finalSslContext.getSocketFactory();
+
+                SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true));
+
+                s.setUseClientMode(false);
+
+
+                // For some reason the createSSLContext above is not enough to enforce
+                // client side auth
+                // If client auth is required...
+                if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) {
+                    s.setNeedClientAuth(true);
+                }
+
+
+                return s;
+            }
+        };
+
+        // Set some parameters to our server
+        server.setSoftwareName("Apache NiFi");
+
+
+        // Set the Server options based on properties
+        server.setPort(context.getProperty(SMTP_PORT).asInteger());
+        server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
+        server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
+        server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
+        server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+
+
+        // Check if TLS should be enabled
+        if (sslContextService != null) {
+            server.setEnableTLS(true);
+        } else {
+            server.setHideTLS(true);
+        }
+
+        // Set TLS to required in case CLIENT_AUTH = required
+        if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) {
+            server.setRequireTLS(true);
+        }
+
+        this.server = server;
+        server.start();
+
+        getLogger().info("Server started and listening on port " + server.getPort());
+
+        initialized.set(true);
+        stopping.set(false);
+    }
+
+    @OnUnscheduled
+    public void startShutdown() throws Exception {
+        if (server != null) {
+            stopping.set(true);
+            getLogger().info("Shutting down processor P{}", new Object[]{server});
+            server.stop();
+            getLogger().info("Shut down {}", new Object[]{server});
+        }
+    }
+
+    @OnStopped
+    public void completeShutdown() throws Exception {
+        if (server != null) {
+            if (!server.isRunning() && stopping.get() ) {
+                stopping.set(false);
+            }
+            getLogger().info("Completed shut down {}", new Object[]{server});
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+        try {
+            initializeSMTPServer(context);
+        } catch (Exception e) {
+            context.yield();
+            throw new ProcessException("Failed to initialize the SMTP server", e);
+        }
+
+        while (!incomingMessages.isEmpty()) {
+            SmtpEvent message = incomingMessages.poll();
+
+
+            if (message == null) {
+                return;
+            }
+
+            synchronized (message) {
+
+                FlowFile flowfile = session.create();
+
+                if (message.getMessageData() != null) {
+                    ByteArrayOutputStream messageData = message.getMessageData();
+                    flowfile = session.write(flowfile, new OutputStreamCallback() {
+
+                        // Write the messageData to flowfile content
+                        @Override
+                        public void process(OutputStream out) throws IOException {
+                            out.write(messageData.toByteArray());
+                        }
+                    });
+                }
+
+                HashMap<String, String> attributes = new HashMap<>();
+                // Gather message attributes
+                attributes.put(SMTP_HELO, message.getHelo());
+                attributes.put(SMTP_SRC_IP, message.getHelo());
+                attributes.put(SMTP_FROM, message.getFrom());
+                attributes.put(SMTP_TO, message.getTo());
+
+                List<Map<String, String>> details = message.getCertifcateDetails();
+                int c = 0;
+
+                // Add a selection of each X509 certificates to the already gathered attributes
+
+                for (Map<String, String> detail : details) {
+                    attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
+                    attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
+                    c++;
+                }
+
+                // Set Mime-Type
+                attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
+
+                // Add the attributes. to flowfile
+                flowfile = session.putAllAttributes(flowfile, attributes);
+                session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
+                session.transfer(flowfile, REL_SUCCESS);
+                getLogger().info("Transferring {} to success", new Object[]{flowfile});
+
+                // Finished processing,
+                message.setProcessed();
+
+                // update the latch so data() can process the rest of the method
+                message.updateProcessedLatch();
+
+                // End of synchronized block
+            }
+
+            // Wait for SMTPMessageHandler data() and done() to complete
+            // their side of the work (i.e. acknowledgement)
+            while (!message.getAcknowledged()) {
+                // Busy wait
+                }
+
+            // Lock one last time
+            synchronized (message) {
+                SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+                switch (resultCode) {
+                    case UNEXPECTED_ERROR:
+                    case TIMEOUT_ERROR:
+                        session.rollback();
+                        getLogger().warn(resultCode.getLogMessage());
+                    case SUCCESS:
+                        getLogger().info(resultCode.getLogMessage());
+                        break;
+                    default:
+                        getLogger().error(resultCode.getLogMessage());
+                }
+            }
+        }
+    }
+
+    // Same old... same old... used for testing to access the random port that was selected
+    protected int getPort() {
+        return server == null ? 0 : server.getPort();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
new file mode 100644
index 0000000..eaded4a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
@@ -0,0 +1,134 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.nifi.processors.email.smtp.event;
+
+
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A Smtp event which adds the transaction number and command to the StandardEvent.
+ */
+
+public class SmtpEvent{
+    private final String remoteIP;
+    private final String helo;
+    private final String from;
+    private final String to;
+    private final ByteArrayOutputStream messageData;
+    private List<Map<String, String>> certificatesDetails;
+    private AtomicBoolean processed = new AtomicBoolean(false);
+    private AtomicBoolean acknowledged = new AtomicBoolean(false);
+    private AtomicInteger returnCode = new AtomicInteger();
+    private CountDownLatch processedLatch;
+
+    public SmtpEvent(
+            final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates,
+            final ByteArrayOutputStream messageData,
+            CountDownLatch processedLatch) {
+
+        this.processedLatch = processedLatch;
+
+        this.remoteIP = remoteIP;
+        this.helo = helo;
+        this.from = from;
+        this.to = to;
+        this.messageData = messageData;
+
+        this.certificatesDetails = new ArrayList<>();
+
+        for (int c = 0; c < certificates.length; c++) {
+            X509Certificate cert = certificates[c];
+            if (cert.getSerialNumber() != null && cert.getSubjectDN() != null) {
+                Map<String, String> certificate = new HashMap<>();
+
+                String certSerialNumber = cert.getSerialNumber().toString();
+                String certSubjectDN = cert.getSubjectDN().getName();
+
+
+                certificate.put("SerialNumber", certSerialNumber);
+                certificate.put("SubjectName", certSubjectDN);
+
+                certificatesDetails.add(certificate);
+
+            }
+        }
+    }
+
+    public synchronized List<Map<String, String>> getCertifcateDetails() {
+        return certificatesDetails;
+    }
+
+    public synchronized String getHelo() {
+        return helo;
+    }
+
+    public synchronized ByteArrayOutputStream getMessageData() {
+        return messageData;
+    }
+
+    public synchronized String getFrom() {
+        return from;
+    }
+
+    public synchronized String getTo() {
+        return to;
+    }
+
+    public synchronized String getRemoteIP() {
+        return remoteIP;
+    }
+
+    public synchronized void setProcessed() {
+        this.processed.set(true);
+    }
+
+    public synchronized boolean getProcessed() {
+        return this.processed.get();
+    }
+
+    public synchronized void setAcknowledged() {
+        this.acknowledged.set(true);
+    }
+
+    public synchronized boolean getAcknowledged() {
+        return this.acknowledged.get();
+    }
+
+    public synchronized void updateProcessedLatch() {
+        this.processedLatch.countDown();
+    }
+
+    public synchronized void setReturnCode(int code) {
+        this.returnCode.set(code);
+    }
+
+    public synchronized int getReturnCode() {
+        return this.returnCode.get();
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
new file mode 100644
index 0000000..0ac4127
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.email.smtp.handler;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.StopWatch;
+import org.subethamail.smtp.DropConnectionException;
+import org.subethamail.smtp.MessageContext;
+import org.subethamail.smtp.MessageHandler;
+import org.subethamail.smtp.MessageHandlerFactory;
+import org.subethamail.smtp.RejectException;
+import org.subethamail.smtp.TooMuchDataException;
+import org.subethamail.smtp.server.SMTPServer;
+
+import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
+
+public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
+    final LinkedBlockingQueue<SmtpEvent> incomingMessages;
+    final ComponentLog logger;
+
+
+    public SMTPMessageHandlerFactory(LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger) {
+        this.incomingMessages = incomingMessages;
+        this.logger = logger;
+
+    }
+
+    @Override
+    public MessageHandler create(MessageContext messageContext) {
+        return new Handler(messageContext, incomingMessages, logger);
+    }
+
+    class Handler implements MessageHandler {
+        final MessageContext messageContext;
+        String from;
+        String recipient;
+        ByteArrayOutputStream messageData;
+
+        private CountDownLatch latch;
+
+        public Handler(MessageContext messageContext, LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
+            this.messageContext = messageContext;
+            this.latch =  new CountDownLatch(1);
+        }
+
+        @Override
+        public void from(String from) throws RejectException {
+            // TODO: possibly whitelist senders?
+            this.from = from;
+        }
+
+        @Override
+        public void recipient(String recipient) throws RejectException {
+            // TODO: possibly whitelist receivers?
+            this.recipient = recipient;
+        }
+
+        @Override
+        public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException {
+            // Start counting the timer...
+            StopWatch watch = new StopWatch(true);
+
+            long elapsed;
+
+            SMTPServer server = messageContext.getSMTPServer();
+
+            final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS);
+
+            this.messageData = new ByteArrayOutputStream();
+
+            byte [] buffer = new byte[1024];
+
+            int rd;
+
+            while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
+                messageData.write(buffer, 0, rd);
+                if (messageData.getBufferLength() > server.getMaxMessageSize() ) {
+                    // NOTE: Setting processed at this stage is not desirable as message object will only be created
+                    // if this test (i.e. message size) passes.
+                    final SMTPResultCode returnCode = SMTPResultCode.fromCode(500);
+                    logger.warn(returnCode.getLogMessage());
+                    throw new TooMuchDataException(returnCode.getErrorMessage());
+                }
+            }
+            messageData.flush();
+
+            X509Certificate[] certificates = new X509Certificate[]{};
+
+            final String remoteIP = messageContext.getRemoteAddress().toString();
+            final String helo = messageContext.getHelo();
+
+            if (messageContext.getTlsPeerCertificates() != null ){
+                certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone();
+            }
+
+            SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, messageData, latch);
+
+            // / Try to queue the message back to the NiFi session
+            try {
+                elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
+                incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
+                logger.trace(returnCode.getLogMessage());
+
+                // NOTE: Setting processed at this stage is redundant as this catch deals with the inability of
+                // adding message to the processing queue. Yet, for the sake of consistency the message is
+                // updated nonetheless
+                message.setReturnCode(returnCode.getCode());
+                message.setAcknowledged();
+                throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+            }
+
+            // Once message has been sent to the queue, it should be processed by NiFi onTrigger,
+            // a flowfile created and its processed status updated before an acknowledgment is
+            // given back to the SMTP client
+            elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
+            try {
+                latch.await(serverTimeout - elapsed, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                // Latch open unexpectedly. Will return error and requestonTrigger to rollback
+                logger.trace("Latch opened unexpectedly and processor indicates data wasn't processed. Returned error to SMTP client as precautionary measure");
+                incomingMessages.remove(message);
+
+                // Set the final values so onTrigger can figure out what happened to message
+                final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
+                message.setReturnCode(returnCode.getCode());
+                message.setAcknowledged();
+
+                // Inform client
+                throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+            }
+
+            // Remove the message from the queue.
+            incomingMessages.remove(message);
+            // Check if message is processed and if yes, check if it was received on time and wraps it up.
+            elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
+            if (!message.getProcessed() ||  (elapsed >= serverTimeout)) {
+                final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
+                logger.trace("Did not receive the onTrigger response within the acceptable timeframes. Data duplication may have occurred.");
+
+                // Set the final values so onTrigger can figure out what happened to message
+                message.setReturnCode(returnCode.getCode());
+                message.setAcknowledged();
+                throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+            }
+
+            // Set the final values so onTrigger can figure out what happened to message
+            message.setReturnCode(250);
+            message.setAcknowledged();
+            // Exit, allowing Handler to acknowledge the message
+    }
+
+        @Override
+        public void done() {
+            logger.trace("Called the last method of message handler. Exiting");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
new file mode 100644
index 0000000..5328a0d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.email.smtp.handler;
+
+public enum SMTPResultCode {
+    // This error isn't raised by code. Being just a default value for the
+    // fromCode method below
+    UNKNOWN_ERROR_CODE(0,
+            "Unknown error.",
+            "Failed due to unknown error"),
+
+    SUCCESS (250,
+            "Success delivering message",
+            "Message from {} to {} via {} acknowledgement complete"),
+
+    QUEUE_ERROR (421,
+            "Could not queue the message. Try again",
+            "The SMTP processor has just dropped a message due to the queue being too full, considering increasing the queue size" ),
+
+    UNEXPECTED_ERROR(423,
+            "Unexpected Error. Please try again or contact the administrator in case it persists",
+            "Error hit during delivery of message from {}"),
+
+    TIMEOUT_ERROR (451,
+            "The processing of your message timed-out, we may have received it but you better off sending it again",
+            "Message from {} to {} via {} acknowledgement timeout despite processing completed. Data duplication may occur"),
+
+    MESSAGE_TOO_LARGE(500,
+            "Message rejected due to length/size of data",
+            "Your message exceeds the maximum permitted size");
+
+    private static final SMTPResultCode[] codeArray = new SMTPResultCode[501];
+
+    static {
+        for (final SMTPResultCode smtpResultCode : SMTPResultCode.values()) {
+            codeArray[smtpResultCode.getCode()] = smtpResultCode;
+        }
+    }
+
+    private final int code;
+    private final String errorMessage;
+    private final String logMessage;
+
+    SMTPResultCode(int code, String errorMessage, String logMessage) {
+        this.code = code;
+        this.errorMessage = errorMessage;
+        this.logMessage = logMessage;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public String getLogMessage() {
+        return logMessage;
+    }
+
+    public static SMTPResultCode fromCode(int code) {
+        final SMTPResultCode smtpResultCode = codeArray[code];
+        return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..7a1f644
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.email.ExtractEmailAttachments
+org.apache.nifi.processors.email.ExtractEmailHeaders
+org.apache.nifi.processors.email.ListenSMTP

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java
new file mode 100644
index 0000000..ef100b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.email;
+
+import org.apache.commons.mail.Email;
+import org.apache.commons.mail.EmailAttachment;
+import org.apache.commons.mail.EmailException;
+import org.apache.commons.mail.MultiPartEmail;
+import org.apache.commons.mail.SimpleEmail;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.MimeMessage;
+import java.io.IOException;
+
+public class GenerateAttachment {
+    String from;
+    String to;
+    String subject;
+    String message;
+    String hostName;
+
+    public GenerateAttachment(String from, String to, String subject, String message, String hostName) {
+        this.from = from;
+        this.to = to;
+        this.subject = subject;
+        this.message = message;
+        this.hostName = hostName;
+    }
+
+    public byte[] SimpleEmail() {
+        Email email = new SimpleEmail();
+        try {
+            email.setFrom(from);
+            email.addTo(to);
+            email.setSubject(subject);
+            email.setMsg(message);
+            email.setHostName(hostName);
+            email.buildMimeMessage();
+        } catch (EmailException e) {
+            e.printStackTrace();
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        MimeMessage mimeMessage = email.getMimeMessage();
+        try {
+            mimeMessage.writeTo(output);
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (MessagingException e) {
+            e.printStackTrace();
+        }
+
+        return output.toByteArray();
+    }
+
+    public byte[] WithAttachments(int amount) {
+        MultiPartEmail email = new MultiPartEmail();
+        try {
+
+            email.setFrom(from);
+            email.addTo(to);
+            email.setSubject(subject);
+            email.setMsg(message);
+            email.setHostName(hostName);
+
+            int x = 1;
+            while (x <= amount) {
+                // Create an attachment with the pom.xml being used to compile (yay!!!)
+                EmailAttachment attachment = new EmailAttachment();
+                attachment.setPath("pom.xml");
+                attachment.setDisposition(EmailAttachment.ATTACHMENT);
+                attachment.setDescription("pom.xml");
+                attachment.setName("pom.xml"+String.valueOf(x));
+                //  attach
+                email.attach(attachment);
+                x++;
+            }
+            email.buildMimeMessage();
+        } catch (EmailException e) {
+            e.printStackTrace();
+        }
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        MimeMessage mimeMessage = email.getMimeMessage();
+        try {
+            mimeMessage.writeTo(output);
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (MessagingException e) {
+            e.printStackTrace();
+        }
+
+        return output.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f672832/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java
new file mode 100644
index 0000000..ee629a6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.email;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestExtractEmailAttachments {
+    // Setups the fields to be used...
+    String from = "Alice <alice@nifi.apache.org>";
+    String to = "bob@nifi.apache.org";
+    String subject = "Just a test email";
+    String message = "Test test test chocolate";
+    String hostName = "bermudatriangle";
+
+    GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName);
+
+
+    @Test
+    public void testValidEmailWithAttachments() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
+
+        // Create the message dynamically
+        byte [] withAttachment = attachmentGenerator.WithAttachments(1);
+
+        runner.enqueue(withAttachment);
+        runner.run();
+
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 1);
+        // Have a look at the attachments...
+        final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS);
+        splits.get(0).assertAttributeEquals("filename", "pom.xml1");
+    }
+
+    @Test
+    public void testValidEmailWithMultipleAttachments() throws Exception {
+        Random rnd = new Random() ;
+        final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
+
+        // Create the message dynamically
+        int amount = rnd.nextInt(10) + 1;
+        byte [] withAttachment = attachmentGenerator.WithAttachments(amount);
+
+        runner.enqueue(withAttachment);
+        runner.run();
+
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, amount);
+        // Have a look at the attachments...
+        final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS);
+
+        List<String> filenames = new ArrayList<>();
+        for (int a = 0 ; a < amount ; a++ ) {
+            filenames.add(splits.get(a).getAttribute("filename").toString());
+        }
+
+        Assert.assertTrue(filenames.containsAll(Arrays.asList("pom.xml1", "pom.xml" + amount)));
+    }
+
+    @Test
+    public void testValidEmailWithoutAttachments() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
+
+        // Create the message dynamically
+        byte [] simpleEmail = attachmentGenerator.SimpleEmail();
+
+        runner.enqueue(simpleEmail);
+        runner.run();
+
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0);
+
+    }
+
+    @Test
+    public void testInvalidEmail() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
+        runner.enqueue("test test test chocolate".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 0);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 1);
+        runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0);
+    }
+}
\ No newline at end of file


Mime
View raw message