nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alopre...@apache.org
Subject [14/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R
Date Mon, 01 May 2017 20:12:07 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
deleted file mode 100644
index 65b11ff..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
-import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
-import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
-import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
-import org.apache.nifi.ssl.SSLContextService;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A base class for processors that send data to an external system using TCP or UDP.
- */
-public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
-
-    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
-            .name("Hostname")
-            .description("The ip address or hostname of the destination.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("localhost")
-            .required(true)
-            .expressionLanguageSupported(true)
-            .build();
-    public static final PropertyDescriptor PORT = new PropertyDescriptor
-            .Builder().name("Port")
-            .description("The port on the destination.")
-            .required(true)
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
-            .name("Max Size of Socket Send Buffer")
-            .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
-                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
-                    "the data can be read, and incoming data will be dropped.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .required(true)
-            .build();
-    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
-            .Builder().name("Idle Connection Expiration")
-            .description("The amount of time a connection should be held open without being used before closing the connection.")
-            .required(true)
-            .defaultValue("5 seconds")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-
-    // Putting these properties here so sub-classes don't have to redefine them, but they are
-    // not added to the properties by default since not all processors may need them
-
-    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
-    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
-
-    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
-            .Builder().name("Protocol")
-            .description("The protocol for communication.")
-            .required(true)
-            .allowableValues(TCP_VALUE, UDP_VALUE)
-            .defaultValue(TCP_VALUE.getValue())
-            .build();
-    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
-            .name("Message Delimiter")
-            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
-                    + "If not specified, the entire content of the FlowFile will be used as a single message. "
-                    + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
-                    + "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile "
-                    + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
-                    + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
-                    + "relationship.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("Specifies the character set of the data being sent.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Timeout")
-            .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
-            .required(false)
-            .defaultValue("10 seconds")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
-            .name("Outgoing Message Delimiter")
-            .description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message "
-                    + "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should "
-                    + "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can "
-                    + "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("\\n")
-            .expressionLanguageSupported(true)
-            .build();
-    public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()
-            .name("Connection Per FlowFile")
-            .description("Specifies whether to send each FlowFile's content on an individual connection.")
-            .required(true)
-            .defaultValue("false")
-            .allowableValues("true", "false")
-            .build();
-
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
-                    "messages will be sent over a secure connection.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
-            .build();
-
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> descriptors;
-
-    protected volatile String transitUri;
-    protected volatile BlockingQueue<ChannelSender> senderPool;
-
-    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
-    protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(HOSTNAME);
-        descriptors.add(PORT);
-        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
-        descriptors.add(IDLE_EXPIRATION);
-        descriptors.addAll(getAdditionalProperties());
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.addAll(getAdditionalRelationships());
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    /**
-     * Override to provide additional relationships for the processor.
-     *
-     * @return a list of relationships
-     */
-    protected List<Relationship> getAdditionalRelationships() {
-        return Collections.EMPTY_LIST;
-    }
-
-    /**
-     * Override to provide additional properties for the processor.
-     *
-     * @return a list of properties
-     */
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Collections.EMPTY_LIST;
-    }
-
-    @Override
-    public final Set<Relationship> getRelationships() {
-        return this.relationships;
-    }
-
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
-    }
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
-        // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
-        this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
-        this.transitUri = createTransitUri(context);
-    }
-
-    @OnStopped
-    public void closeSenders() {
-        if (senderPool != null) {
-            ChannelSender sender = senderPool.poll();
-            while (sender != null) {
-                sender.close();
-                sender = senderPool.poll();
-            }
-        }
-    }
-
-    /**
-     * Sub-classes construct a transit uri for provenance events. Called from @OnScheduled
-     * method of this class.
-     *
-     * @param context the current context
-     *
-     * @return the transit uri
-     */
-    protected abstract String createTransitUri(final ProcessContext context);
-
-    /**
-     * Sub-classes create a ChannelSender given a context.
-     *
-     * @param context the current context
-     * @return an implementation of ChannelSender
-     * @throws IOException if an error occurs creating the ChannelSender
-     */
-    protected abstract ChannelSender createSender(final ProcessContext context) throws IOException;
-
-    /**
-     * Close any senders that haven't been active with in the given threshold
-     *
-     * @param idleThreshold the threshold to consider a sender as idle
-     */
-    protected void pruneIdleSenders(final long idleThreshold) {
-        long currentTime = System.currentTimeMillis();
-        final List<ChannelSender> putBack = new ArrayList<>();
-
-        // if a connection hasn't been used with in the threshold then it gets closed
-        ChannelSender sender;
-        while ((sender = senderPool.poll()) != null) {
-            if (currentTime > (sender.getLastUsed() + idleThreshold)) {
-                getLogger().debug("Closing idle connection...");
-                sender.close();
-            } else {
-                putBack.add(sender);
-            }
-        }
-        // re-queue senders that weren't idle, but if the queue is full then close the sender
-        for (ChannelSender putBackSender : putBack) {
-            boolean returned = senderPool.offer(putBackSender);
-            if (!returned) {
-                putBackSender.close();
-            }
-        }
-    }
-
-    /**
-     * Helper for sub-classes to create a sender.
-     *
-     * @param protocol the protocol for the sender
-     * @param host the host to send to
-     * @param port the port to send to
-     * @param timeout the timeout for connecting and communicating over the channel
-     * @param maxSendBufferSize the maximum size of the socket send buffer
-     * @param sslContext an SSLContext, or null if not using SSL
-     *
-     * @return a ChannelSender based on the given properties
-     *
-     * @throws IOException if an error occurs creating the sender
-     */
-    protected ChannelSender createSender(final String protocol,
-                                         final String host,
-                                         final int port,
-                                         final int timeout,
-                                         final int maxSendBufferSize,
-                                         final SSLContext sslContext) throws IOException {
-
-        ChannelSender sender;
-        if (protocol.equals(UDP_VALUE.getValue())) {
-            sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger());
-        } else {
-            // if an SSLContextService is provided then we make a secure sender
-            if (sslContext != null) {
-                sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger());
-            } else {
-                sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger());
-            }
-        }
-
-        sender.setTimeout(timeout);
-        sender.open();
-        return sender;
-    }
-
-    /**
-     * Helper method to acquire an available ChannelSender from the pool. If the pool is empty then the a new sender is created.
-     *
-     * @param context
-     *            - the current process context.
-     *
-     * @param session
-     *            - the current process session.
-     * @param flowFile
-     *            - the FlowFile being processed in this session.
-     *
-     * @return ChannelSender - the sender that has been acquired or null if no sender is available and a new sender cannot be created.
-     */
-    protected ChannelSender acquireSender(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
-        ChannelSender sender = senderPool.poll();
-        if (sender == null) {
-            try {
-                getLogger().debug("No available connections, creating a new one...");
-                sender = createSender(context);
-            } catch (IOException e) {
-                getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
-                        new Object[]{flowFile}, e);
-                session.transfer(flowFile, REL_FAILURE);
-                session.commit();
-                context.yield();
-                sender = null;
-            }
-        }
-
-        return sender;
-    }
-
-
-    /**
-     * Helper method to relinquish the ChannelSender back to the pool. If the sender is disconnected or the pool is full
-     * then the sender is closed and discarded.
-     *
-     * @param sender the sender to return or close
-     */
-    protected void relinquishSender(final ChannelSender sender) {
-        if (sender != null) {
-            // if the connection is still open then then try to return the sender to the pool.
-            if (sender.isConnected()) {
-                boolean returned = senderPool.offer(sender);
-                // if the pool is full then close the sender.
-                if (!returned) {
-                    sender.close();
-                }
-            } else {
-                // probably already closed here, but quietly close anyway to be safe.
-                sender.close();
-            }
-        }
-    }
-
-    /**
-     * Represents a range of messages from a FlowFile.
-     */
-    protected static class Range {
-        private final long start;
-        private final long end;
-
-        public Range(final long start, final long end) {
-            this.start = start;
-            this.end = end;
-        }
-
-        public long getStart() {
-            return start;
-        }
-
-        public long getEnd() {
-            return end;
-        }
-
-        @Override
-        public String toString() {
-            return "Range[" + start + "-" + end + "]";
-        }
-    }
-
-    /**
-     * A wrapper to hold the ranges of a FlowFile that were successful and ranges that failed, and then
-     * transfer those ranges appropriately.
-     */
-    protected class FlowFileMessageBatch {
-
-        private final ProcessSession session;
-        private final FlowFile flowFile;
-        private final long startTime = System.nanoTime();
-
-        private final List<Range> successfulRanges = new ArrayList<>();
-        private final List<Range> failedRanges = new ArrayList<>();
-
-        private Exception lastFailureReason;
-        private long numMessages = -1L;
-        private long completeTime = 0L;
-        private boolean canceled = false;
-
-        public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile) {
-            this.session = session;
-            this.flowFile = flowFile;
-        }
-
-        public synchronized void cancelOrComplete() {
-            if (isComplete()) {
-                completeSession();
-                return;
-            }
-
-            this.canceled = true;
-
-            session.rollback();
-            successfulRanges.clear();
-            failedRanges.clear();
-        }
-
-        public synchronized void addSuccessfulRange(final long start, final long end) {
-            if (canceled) {
-                return;
-            }
-
-            successfulRanges.add(new Range(start, end));
-
-            if (isComplete()) {
-                activeBatches.remove(this);
-                completeBatches.add(this);
-                completeTime = System.nanoTime();
-            }
-        }
-
-        public synchronized void addFailedRange(final long start, final long end, final Exception e) {
-            if (canceled) {
-                return;
-            }
-
-            failedRanges.add(new Range(start, end));
-            lastFailureReason = e;
-
-            if (isComplete()) {
-                activeBatches.remove(this);
-                completeBatches.add(this);
-                completeTime = System.nanoTime();
-            }
-        }
-
-        private boolean isComplete() {
-            return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages);
-        }
-
-        public synchronized void setNumMessages(final long msgCount) {
-            this.numMessages = msgCount;
-
-            if (isComplete()) {
-                activeBatches.remove(this);
-                completeBatches.add(this);
-                completeTime = System.nanoTime();
-            }
-        }
-
-        private void transferRanges(final List<Range> ranges, final Relationship relationship) {
-            Collections.sort(ranges, new Comparator<Range>() {
-                @Override
-                public int compare(final Range o1, final Range o2) {
-                    return Long.compare(o1.getStart(), o2.getStart());
-                }
-            });
-
-            for (int i = 0; i < ranges.size(); i++) {
-                Range range = ranges.get(i);
-                int count = 1;
-
-                while (i + 1 < ranges.size()) {
-                    // Check if the next range in the List continues where this one left off.
-                    final Range nextRange = ranges.get(i + 1);
-
-                    if (nextRange.getStart() == range.getEnd()) {
-                        // We have two ranges in a row that are contiguous; combine them into a single Range.
-                        range = new Range(range.getStart(), nextRange.getEnd());
-
-                        count++;
-                        i++;
-                    } else {
-                        break;
-                    }
-                }
-
-                // Create a FlowFile for this range.
-                FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart());
-                if (relationship == REL_SUCCESS) {
-                    session.getProvenanceReporter().send(child, transitUri, "Sent " + count + " messages");
-                    session.transfer(child, relationship);
-                } else {
-                    child = session.penalize(child);
-                    session.transfer(child, relationship);
-                }
-            }
-        }
-
-        public synchronized void completeSession() {
-            if (canceled) {
-                return;
-            }
-
-            if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
-                getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile});
-                session.transfer(flowFile, REL_SUCCESS);
-                session.commit();
-                return;
-            }
-
-            if (successfulRanges.isEmpty()) {
-                getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
-                final FlowFile penalizedFlowFile = session.penalize(flowFile);
-                session.transfer(penalizedFlowFile, REL_FAILURE);
-                session.commit();
-                return;
-            }
-
-            if (failedRanges.isEmpty()) {
-                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
-                session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis);
-                session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
-                session.commit();
-                return;
-            }
-
-            // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way
-            // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success'
-            // and the failed messages to 'failure'.
-            transferRanges(successfulRanges, REL_SUCCESS);
-            transferRanges(failedRanges, REL_FAILURE);
-            session.remove(flowFile);
-            getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}",
-                    new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
-            session.commit();
-        }
-    }
-
-    /**
-     * Gets the current value of the "Outgoing Message Delimiter" property and parses the special characters.
-     *
-     * @param context
-     *            - the current process context.
-     * @param flowFile
-     *            - the FlowFile being processed.
-     *
-     * @return String containing the Delimiter value.
-     */
-    protected String getOutgoingMessageDelimiter(final ProcessContext context, final FlowFile flowFile) {
-        String delimiter = context.getProperty(OUTGOING_MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
-        if (delimiter != null) {
-            delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
-        }
-        return delimiter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
deleted file mode 100644
index 278a9ab..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.nifi.logging.ComponentLog;
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-/**
- * Base class for sending messages over a channel.
- */
-public abstract class ChannelSender {
-
-    protected final int port;
-    protected final String host;
-    protected final int maxSendBufferSize;
-    protected final ComponentLog logger;
-
-    protected volatile int timeout = 10000;
-    protected volatile long lastUsed;
-
-    public ChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
-        this.port = port;
-        this.host = host;
-        this.maxSendBufferSize = maxSendBufferSize;
-        this.logger = logger;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    /**
-     * @return the last time data was sent over this channel
-     */
-    public long getLastUsed() {
-        return lastUsed;
-    }
-
-    /**
-     * Opens the connection to the destination.
-     *
-     * @throws IOException if an error occurred opening the connection.
-     */
-    public abstract void open() throws IOException;
-
-    /**
-     * Sends the given string over the channel.
-     *
-     * @param message the message to send over the channel
-     * @throws IOException if there was an error communicating over the channel
-     */
-    public void send(final String message, final Charset charset) throws IOException {
-        final byte[] bytes = message.getBytes(charset);
-        send(bytes);
-    }
-
-    /**
-     * Sends the given data over the channel.
-     *
-     * @param data the data to send over the channel
-     * @throws IOException if there was an error communicating over the channel
-     */
-    public void send(final byte[] data) throws IOException {
-        try {
-            write(data);
-            lastUsed = System.currentTimeMillis();
-        } catch (IOException e) {
-            // failed to send data over the channel, we close it to force
-            // the creation of a new one next time
-            close();
-            throw e;
-        }
-    }
-
-    /**
-     * Write the given buffer to the underlying channel.
-     */
-    protected abstract void write(byte[] data) throws IOException;
-
-    /**
-     * @return true if the underlying channel is connected
-     */
-    public abstract boolean isConnected();
-
-    /**
-     * Close the underlying channel
-     */
-    public abstract void close();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
deleted file mode 100644
index 0b2dfb8..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-
-/**
- * Sends messages over a DatagramChannel.
- */
-public class DatagramChannelSender extends ChannelSender {
-
-    private DatagramChannel channel;
-
-    public DatagramChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
-        super(host, port, maxSendBufferSize, logger);
-    }
-
-    @Override
-    public void open() throws IOException {
-        if (channel == null) {
-            channel = DatagramChannel.open();
-
-            if (maxSendBufferSize > 0) {
-                channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
-                final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
-                if (actualSendBufSize < maxSendBufferSize) {
-                    logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
-                            + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
-                            + "consider changing the Operating System's maximum receive buffer");
-                }
-            }
-        }
-
-        if (!channel.isConnected()) {
-            channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
-        }
-    }
-
-    @Override
-    protected void write(byte[] data) throws IOException {
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-        while (buffer.hasRemaining()) {
-            channel.write(buffer);
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return channel != null && channel.isConnected();
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(channel);
-        channel = null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
deleted file mode 100644
index a70c9c5..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-
-/**
- * Sends messages over an SSLSocketChannel.
- */
-public class SSLSocketChannelSender extends SocketChannelSender {
-
-    private SSLSocketChannel sslChannel;
-    private SSLContext sslContext;
-
-    public SSLSocketChannelSender(final String host,
-                                  final int port,
-                                  final int maxSendBufferSize,
-                                  final SSLContext sslContext,
-                                  final ComponentLog logger) {
-        super(host, port, maxSendBufferSize, logger);
-        this.sslContext = sslContext;
-    }
-
-    @Override
-    public void open() throws IOException {
-        if (sslChannel == null) {
-            super.open();
-            sslChannel = new SSLSocketChannel(sslContext, channel, true);
-        }
-        sslChannel.setTimeout(timeout);
-
-        // SSLSocketChannel will check if already connected so we can safely call this
-        sslChannel.connect();
-    }
-
-    @Override
-    protected void write(byte[] data) throws IOException {
-        sslChannel.write(data);
-    }
-
-    @Override
-    public boolean isConnected() {
-        return sslChannel != null && !sslChannel.isClosed();
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        IOUtils.closeQuietly(sslChannel);
-        sslChannel = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
deleted file mode 100644
index 8d4f875..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.StandardSocketOptions;
-import java.nio.channels.SocketChannel;
-
-/**
- * Sends messages over a SocketChannel.
- */
-public class SocketChannelSender extends ChannelSender {
-
-    protected SocketChannel channel;
-    protected SocketChannelOutputStream socketChannelOutput;
-
-    public SocketChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
-        super(host, port, maxSendBufferSize, logger);
-    }
-
-    @Override
-    public void open() throws IOException {
-        if (channel == null) {
-            channel = SocketChannel.open();
-            channel.configureBlocking(false);
-
-            if (maxSendBufferSize > 0) {
-                channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
-                final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
-                if (actualSendBufSize < maxSendBufferSize) {
-                    logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
-                            + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
-                            + "consider changing the Operating System's maximum send buffer");
-                }
-            }
-        }
-
-        if (!channel.isConnected()) {
-            final long startTime = System.currentTimeMillis();
-            final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port);
-
-            if (!channel.connect(socketAddress)) {
-                while (!channel.finishConnect()) {
-                    if (System.currentTimeMillis() > startTime + timeout) {
-                        throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port);
-                    }
-
-                    try {
-                        Thread.sleep(50L);
-                    } catch (final InterruptedException e) {
-                    }
-                }
-            }
-
-            socketChannelOutput = new SocketChannelOutputStream(channel);
-            socketChannelOutput.setTimeout(timeout);
-        }
-    }
-
-    @Override
-    protected void write(byte[] data) throws IOException {
-        socketChannelOutput.write(data);
-    }
-
-    @Override
-    public boolean isConnected() {
-        return channel != null && channel.isConnected();
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(socketChannelOutput);
-        IOUtils.closeQuietly(channel);
-        socketChannelOutput = null;
-        channel = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
deleted file mode 100644
index bd73379..0000000
--- a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.processor.exception.ProcessException;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TestExceptionHandler {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestExceptionHandler.class);
-
-    /**
-     * Simulate an external procedure.
-     */
-    static class ExternalProcedure {
-        private boolean available = true;
-        int divide(Integer a, Integer b) throws Exception {
-            if (!available) {
-                throw new IOException("Not available");
-            }
-            if (a == 10) {
-                throw new IllegalStateException("Service for 10 is not currently available.");
-            }
-            return a / b;
-        }
-    }
-
-    private class Context {
-        int count = 0;
-    }
-
-    @Test
-    public void testBasicUsage() {
-
-        final ExternalProcedure p = new ExternalProcedure();
-
-        try {
-            // Although a catch-exception has to be caught each possible call,
-            // usually the error handling logic will be the same.
-            // Ends up having a lot of same code.
-            final int r1 = p.divide(4, 2);
-            assertEquals(2, r1);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        final Context context = new Context();
-        final ExceptionHandler<Context> handler = new ExceptionHandler<>();
-
-        // Using handler can avoid the try catch block with reusable error handling logic.
-        handler.execute(context, 6, i -> {
-            final int r2 = p.divide(i, 2);
-            assertEquals(3, r2);
-        });
-
-        // If return value is needed, use AtomicReference.
-        AtomicReference<Integer> r = new AtomicReference<>();
-        handler.execute(context, 8, i -> r.set(p.divide(i, 2)));
-        assertEquals(4, r.get().intValue());
-
-        // If no exception mapping is specified, any Exception thrown is wrapped by ProcessException.
-        try {
-            final Integer nullInput = null;
-            handler.execute(context, nullInput, i -> r.set(p.divide(i, 2)));
-            fail("Exception should be thrown because input is null.");
-        } catch (ProcessException e) {
-            assertTrue(e.getCause() instanceof NullPointerException);
-        }
-    }
-
-    // Reusable Exception mapping function.
-    static Function<Exception, ErrorTypes> exceptionMapping = i -> {
-        try {
-            throw i;
-        } catch (NullPointerException | ArithmeticException | NumberFormatException e) {
-            return ErrorTypes.InvalidInput;
-        } catch (IllegalStateException e) {
-            return ErrorTypes.TemporalInputFailure;
-        } catch (IOException e) {
-            return ErrorTypes.TemporalFailure;
-        } catch (Exception e) {
-            throw new ProcessException(e);
-        }
-    };
-
-    @Test
-    public void testHandling() {
-
-        final ExternalProcedure p = new ExternalProcedure();
-        final Context context = new Context();
-
-        final ExceptionHandler<Context> handler = new ExceptionHandler<>();
-        handler.mapException(exceptionMapping);
-        handler.onError(createInputErrorHandler());
-
-        // Benefit of handler is being able to externalize error handling, make it simpler.
-        handler.execute(context, 4, i -> {
-            final int r = p.divide(i, 2);
-            assertEquals(2, r);
-        });
-
-        // Null pointer exception.
-        final Integer input = null;
-        handler.execute(context, input, i -> {
-            p.divide(i, 2);
-            fail("Shouldn't reach here.");
-        });
-
-        // Divide by zero.
-        handler.execute(context, 0, i -> {
-            p.divide(2, i);
-            fail("Shouldn't reach here.");
-        });
-
-
-    }
-
-    static <C> ExceptionHandler.OnError<C, Integer> createInputErrorHandler() {
-        return (c, i, r, e) -> {
-            switch (r.destination()) {
-                case ProcessException:
-                    throw new ProcessException(String.format("Execution failed due to %s", e), e);
-                default:
-                    logger.warn(String.format("Routing to %s: %d caused %s", r, i, e));
-            }
-        };
-    }
-
-    static <C> ExceptionHandler.OnError<C, Integer[]> createArrayInputErrorHandler() {
-        return (c, i, r, e) -> {
-            switch (r.destination()) {
-                case ProcessException:
-                    throw new ProcessException(String.format("Execution failed due to %s", e), e);
-                default:
-                    logger.warn(String.format("Routing to %s: %d, %d caused %s", r, i[0], i[1], e));
-            }
-        };
-    }
-
-    @Test
-    public void testHandlingLoop() {
-
-        final ExternalProcedure p = new ExternalProcedure();
-        final Context context = new Context();
-
-        final ExceptionHandler<Context> handler = new ExceptionHandler<>();
-        handler.mapException(exceptionMapping);
-        handler.onError(createArrayInputErrorHandler());
-
-        // It's especially handy when looping through inputs. [a, b, expected result]
-        Integer[][] inputs = new Integer[][]{{4, 2, 2}, {null, 2, 999}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
-
-        Arrays.stream(inputs).forEach(input ->  handler.execute(context, input, (in) -> {
-            final Integer r = p.divide(in[0], in[1]);
-            // This is safe because if p.divide throws error, this code won't be executed.
-            assertEquals(in[2], r);
-        }));
-
-        AtomicReference<Integer> r = new AtomicReference<>();
-        for (Integer[] input : inputs) {
-
-            if (!handler.execute(context, input, (in) -> {
-                r.set(p.divide(in[0], in[1]));
-                context.count++;
-            })){
-                // Handler returns false when it fails.
-                // Cleaner if-exception-continue-next-input can be written cleaner.
-                continue;
-            }
-
-            assertEquals(input[2], r.get());
-        }
-
-        assertEquals("Successful inputs", 2, context.count);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
deleted file mode 100644
index 6d73759..0000000
--- a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.pattern.TestExceptionHandler.ExternalProcedure;
-import org.apache.nifi.util.MockComponentLog;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.createArrayInputErrorHandler;
-import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.exceptionMapping;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class TestRollbackOnFailure {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestRollbackOnFailure.class);
-
-    /**
-     * This can be an example for how to compose an ExceptionHandler instance by reusable functions.
-     * @param logger used to log messages within functions
-     * @return a composed ExceptionHandler
-     */
-    private ExceptionHandler<RollbackOnFailure> getContextAwareExceptionHandler(ComponentLog logger) {
-        final ExceptionHandler<RollbackOnFailure> handler = new ExceptionHandler<>();
-        handler.mapException(exceptionMapping);
-        handler.adjustError(RollbackOnFailure.createAdjustError(logger));
-        handler.onError(createArrayInputErrorHandler());
-        return handler;
-    }
-
-    private void processInputs(RollbackOnFailure context, Integer[][] inputs, List<Integer> results) {
-        final ExternalProcedure p = new ExternalProcedure();
-        final MockComponentLog componentLog = new MockComponentLog("processor-id", this);
-        final ExceptionHandler<RollbackOnFailure> handler = getContextAwareExceptionHandler(componentLog);
-
-        for (Integer[] input : inputs) {
-
-            if (!handler.execute(context, input, (in) -> {
-                results.add(p.divide(in[0], in[1]));
-                context.proceed();
-            })){
-                continue;
-            }
-
-            assertEquals(input[2], results.get(results.size() - 1));
-        }
-    }
-
-    @Test
-    public void testContextDefaultBehavior() {
-
-        // Disabling rollbackOnFailure would route Failure or Retry as they are.
-        final RollbackOnFailure context = new RollbackOnFailure(false, false);
-
-        Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
-
-        final List<Integer> results = new ArrayList<>();
-        try {
-            processInputs(context, inputs, results);
-        } catch (ProcessException e) {
-            fail("ProcessException should NOT be thrown");
-        }
-
-        assertEquals("Successful inputs", 2, context.getProcessedCount());
-    }
-
-    @Test
-    public void testContextRollbackOnFailureNonTransactionalFirstFailure() {
-
-        final RollbackOnFailure context = new RollbackOnFailure(true, false);
-
-        // If the first execution fails without any succeeded inputs, it should throw a ProcessException.
-        Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
-
-        final List<Integer> results = new ArrayList<>();
-        try {
-            processInputs(context, inputs, results);
-            fail("ProcessException should be thrown");
-        } catch (ProcessException e) {
-            logger.info("Exception was thrown as expected.");
-        }
-
-        assertEquals("Successful inputs", 0, context.getProcessedCount());
-    }
-
-    @Test
-    public void testContextRollbackOnFailureNonTransactionalAlreadySucceeded() {
-
-        final RollbackOnFailure context = new RollbackOnFailure(true, false);
-
-        // If an execution fails after succeeded inputs, it transfer the input to Failure instead of ProcessException,
-        // and keep going. Because the external system does not support transaction.
-        Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
-
-        final List<Integer> results = new ArrayList<>();
-        try {
-            processInputs(context, inputs, results);
-        } catch (ProcessException e) {
-            fail("ProcessException should NOT be thrown");
-        }
-
-        assertEquals("Successful inputs", 2, context.getProcessedCount());
-    }
-
-    @Test
-    public void testContextRollbackOnFailureTransactionalAlreadySucceeded() {
-
-        final RollbackOnFailure context = new RollbackOnFailure(true, true);
-
-        // Even if an execution fails after succeeded inputs, it transfer the input to Failure,
-        // because the external system supports transaction.
-        Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
-
-        final List<Integer> results = new ArrayList<>();
-        try {
-            processInputs(context, inputs, results);
-            fail("ProcessException should be thrown");
-        } catch (ProcessException e) {
-            logger.info("Exception was thrown as expected.");
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/pom.xml b/nifi-commons/nifi-record/pom.xml
new file mode 100644
index 0000000..57e0e66
--- /dev/null
+++ b/nifi-commons/nifi-record/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-record</artifactId>
+    <description>
+        This module contains the domain model for NiFi's Record abstraction, including
+        several interfaces for interacting with Records. This module should not depend
+        on any external libraries.
+    </description>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
new file mode 100644
index 0000000..2fe06f4
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+public enum SchemaField {
+    SCHEMA_TEXT("Schema Text"),
+    SCHEMA_TEXT_FORMAT("Schema Text Format"),
+    SCHEMA_NAME("Schema Name"),
+    SCHEMA_IDENTIFIER("Schema Identifier"),
+    SCHEMA_VERSION("Schema Version");
+
+    private final String description;
+
+    private SchemaField(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
new file mode 100644
index 0000000..9a064ff
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+public class SchemaNotFoundException extends Exception {
+    public SchemaNotFoundException(final String message) {
+        super(message);
+    }
+
+    public SchemaNotFoundException(final String message, final Throwable cause) {
+        super(cause);
+    }
+
+    public SchemaNotFoundException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java
new file mode 100644
index 0000000..d45a850
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+/**
+ * An Exception that can be thrown to indicate that data was read but could not properly be parsed
+ */
+public class MalformedRecordException extends Exception {
+    public MalformedRecordException(final String message) {
+        super(message);
+    }
+
+    public MalformedRecordException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
new file mode 100644
index 0000000..add248e
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+/**
+ * <p>
+ * A RowRecordReader is responsible for parsing data and returning a record at a time
+ * in order to allow the caller to iterate over the records individually.
+ * </p>
+ *
+ * <p>
+ * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible
+ * manner between minor or incremental releases of NiFi.
+ * </p>
+ */
+public interface RecordReader extends Closeable {
+
+    /**
+     * Returns the next record in the stream or <code>null</code> if no more records are available.
+     *
+     * @return the next record in the stream or <code>null</code> if no more records are available.
+     *
+     * @throws IOException if unable to read from the underlying data
+     * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
+     */
+    Record nextRecord() throws IOException, MalformedRecordException;
+
+    /**
+     * @return a RecordSchema that is appropriate for the records in the stream
+     * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse the underlying data
+     */
+    RecordSchema getSchema() throws MalformedRecordException;
+
+    /**
+     * @return a RecordSet that returns the records in this Record Reader in a streaming fashion
+     */
+    default RecordSet createRecordSet() {
+        return new RecordSet() {
+            @Override
+            public RecordSchema getSchema() throws IOException {
+                try {
+                    return RecordReader.this.getSchema();
+                } catch (final MalformedRecordException mre) {
+                    throw new IOException(mre);
+                }
+            }
+
+            @Override
+            public Record next() throws IOException {
+                try {
+                    return RecordReader.this.nextRecord();
+                } catch (final MalformedRecordException mre) {
+                    throw new IOException(mre);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
new file mode 100644
index 0000000..7d6fa1c
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.serialization.record.RecordSet;
+
+/**
+ * <p>
+ * A ResultSetWriter is responsible for writing a ResultSet to a given {@link OutputStream}.
+ * </p>
+ *
+ * <p>
+ * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible
+ * manner between minor or incremental releases of NiFi.
+ * </p>
+ */
+public interface RecordSetWriter extends RecordWriter {
+    /**
+     * Writes the given result set to the given output stream
+     *
+     * @param recordSet the record set to serialize
+     * @param out the OutputStream to write to
+     * @return the results of writing the data
+     * @throws IOException if unable to write to the given OutputStream
+     */
+    WriteResult write(RecordSet recordSet, OutputStream out) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
new file mode 100644
index 0000000..aa298d9
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.serialization.record.Record;
+
+public interface RecordWriter {
+    /**
+     * Writes the given result set to the given output stream
+     *
+     * @param record the record set to serialize
+     * @param out the OutputStream to write to
+     * @return the results of writing the data
+     * @throws IOException if unable to write to the given OutputStream
+     */
+    WriteResult write(Record record, OutputStream out) throws IOException;
+
+    /**
+     * @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using
+     *         the mime.type attribute.
+     */
+    String getMimeType();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
new file mode 100644
index 0000000..017aef1
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+public class SimpleRecordSchema implements RecordSchema {
+    private final List<RecordField> fields;
+    private final Map<String, Integer> fieldIndices;
+    private final boolean textAvailable;
+    private final String text;
+    private final String schemaFormat;
+    private final SchemaIdentifier schemaIdentifier;
+
+    public SimpleRecordSchema(final List<RecordField> fields) {
+        this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY);
+    }
+
+    public SimpleRecordSchema(final List<RecordField> fields, final SchemaIdentifier id) {
+        this(fields, createText(fields), null, false, id);
+    }
+
+    public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
+        this(fields, text, schemaFormat, true, id);
+    }
+
+    private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
+        this.text = text;
+        this.schemaFormat = schemaFormat;
+        this.schemaIdentifier = id;
+        this.textAvailable = textAvailable;
+        this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
+        this.fieldIndices = new HashMap<>(fields.size());
+
+        int index = 0;
+        for (final RecordField field : fields) {
+            Integer previousValue = fieldIndices.put(field.getFieldName(), index);
+            if (previousValue != null) {
+                throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
+            }
+
+            for (final String alias : field.getAliases()) {
+                previousValue = fieldIndices.put(alias, index);
+                if (previousValue != null) {
+                    throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
+                }
+            }
+
+            index++;
+        }
+    }
+
+    @Override
+    public Optional<String> getSchemaText() {
+        if (textAvailable) {
+            return Optional.ofNullable(text);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+
+    @Override
+    public Optional<String> getSchemaFormat() {
+        return Optional.ofNullable(schemaFormat);
+    }
+
+    @Override
+    public List<RecordField> getFields() {
+        return fields;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fields.size();
+    }
+
+    @Override
+    public RecordField getField(final int index) {
+        return fields.get(index);
+    }
+
+    @Override
+    public List<DataType> getDataTypes() {
+        return getFields().stream().map(recordField -> recordField.getDataType())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<String> getFieldNames() {
+        return getFields().stream().map(recordField -> recordField.getFieldName())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<DataType> getDataType(final String fieldName) {
+        final OptionalInt idx = getFieldIndex(fieldName);
+        return idx.isPresent() ? Optional.of(fields.get(idx.getAsInt()).getDataType()) : Optional.empty();
+    }
+
+    @Override
+    public Optional<RecordField> getField(final String fieldName) {
+        final OptionalInt indexOption = getFieldIndex(fieldName);
+        if (indexOption.isPresent()) {
+            return Optional.of(fields.get(indexOption.getAsInt()));
+        }
+
+        return Optional.empty();
+    }
+
+    private OptionalInt getFieldIndex(final String fieldName) {
+        final Integer index = fieldIndices.get(fieldName);
+        return index == null ? OptionalInt.empty() : OptionalInt.of(index);
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof RecordSchema)) {
+            return false;
+        }
+
+        final RecordSchema other = (RecordSchema) obj;
+        return fields.equals(other.getFields());
+    }
+
+    @Override
+    public int hashCode() {
+        return 143 + 3 * fields.hashCode();
+    }
+
+    private static String createText(final List<RecordField> fields) {
+        final StringBuilder sb = new StringBuilder("[");
+
+        for (int i = 0; i < fields.size(); i++) {
+            final RecordField field = fields.get(i);
+
+            sb.append("\"");
+            sb.append(field.getFieldName());
+            sb.append("\" : \"");
+            sb.append(field.getDataType());
+            sb.append("\"");
+
+            if (i < fields.size() - 1) {
+                sb.append(", ");
+            }
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public String toString() {
+        return text;
+    }
+
+    @Override
+    public SchemaIdentifier getIdentifier() {
+        return schemaIdentifier;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java
new file mode 100644
index 0000000..3fb2741
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * <p>
+ * Provides information about what was written to an OutputStream by a {@link RecordSetWriter}.
+ * Instances of WriteResult are typically instantiated by calling the static method {@link WriteResult#of(int, Map)}
+ * or using {@link WriteResult#EMPTY}.
+ * </p>
+ *
+ * <p>
+ * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible
+ * manner between minor or incremental releases of NiFi.
+ * </p>
+ */
+public interface WriteResult {
+
+    /**
+     * @return the number of records written
+     */
+    int getRecordCount();
+
+    /**
+     * @return values that should be added to the FlowFile as attributes
+     */
+    Map<String, String> getAttributes();
+
+    /**
+     * Creates a WriteResult with the given record count and attributes
+     *
+     * @param recordCount the number of records written
+     * @param attributes the attributes to add to the FlowFile
+     * @return A {@link WriteResult} representing the given parameters
+     */
+    public static WriteResult of(final int recordCount, final Map<String, String> attributes) {
+        return new WriteResult() {
+            @Override
+            public int getRecordCount() {
+                return recordCount;
+            }
+
+            @Override
+            public Map<String, String> getAttributes() {
+                return attributes;
+            }
+        };
+    }
+
+    public static final WriteResult EMPTY = of(0, Collections.emptyMap());
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java
new file mode 100644
index 0000000..6ed4bd6
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Objects;
+
+public class DataType {
+    private final RecordFieldType fieldType;
+    private final String format;
+
+    protected DataType(final RecordFieldType fieldType, final String format) {
+        this.fieldType = fieldType;
+        this.format = format;
+    }
+
+    public String getFormat() {
+        return format;
+    }
+
+    public RecordFieldType getFieldType() {
+        return fieldType;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (getFormat() == null ? 0 : getFormat().hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof DataType)) {
+            return false;
+        }
+
+        final DataType other = (DataType) obj;
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(getFormat(), other.getFormat());
+    }
+
+    @Override
+    public String toString() {
+        if (getFormat() == null) {
+            return getFieldType().toString();
+        } else {
+            return getFieldType().toString() + ":" + getFormat();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java
new file mode 100644
index 0000000..3968f50
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ListRecordSet implements RecordSet {
+    private final Iterator<Record> recordItr;
+    private final RecordSchema schema;
+
+    public ListRecordSet(final RecordSchema schema, final List<Record> records) {
+        this.schema = schema;
+
+        final List<Record> copy = new ArrayList<>(records);
+        recordItr = copy.iterator();
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Record next() {
+        return recordItr.hasNext() ? recordItr.next() : null;
+    }
+}


Mime
View raw message