nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [2/3] nifi git commit: NIFI-1420 Adding Splunk bundle containing PutSplunk, and GetSplunk, and adding a ListenTCP processor to standard processors. Refactored internal code from PutSyslog to create a generic AbstractPutEventProcessor which PutSplunk exte
Date Mon, 07 Mar 2016 23:21:56 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
new file mode 100644
index 0000000..b9d9e0b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+
+import com.splunk.JobExportArgs;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"get", "splunk", "logs"})
+@CapabilityDescription("Retrieves data from Splunk Enterprise.")
+@WritesAttributes({
+        @WritesAttribute(attribute="splunk.query", description = "The query that performed to produce the FlowFile."),
+        @WritesAttribute(attribute="splunk.earliest.time", description = "The value of the earliest time that was used when performing the query."),
+        @WritesAttribute(attribute="splunk.latest.time", description = "The value of the latest time that was used when performing the query.")
+})
+@Stateful(scopes = Scope.CLUSTER, description = "If using one of the managed Time Range Strategies, this processor will " +
+        "store the values of the latest and earliest times from the previous execution so that the next execution of the " +
+        "can pick up where the last execution left off. The state will be cleared and start over if the query is changed.")
+public class GetSplunk extends AbstractProcessor {
+
+    public static final String HTTP_SCHEME = "http";
+    public static final String HTTPS_SCHEME = "https";
+
+    public static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("8089")
+            .build();
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+            .name("Query")
+            .description("The query to execute. Typically beginning with a <search> command followed by a search clause, " +
+                    "such as <search source=\"tcp:7689\"> to search for messages received on TCP port 7689.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("search * | head 100")
+            .required(true)
+            .build();
+
+    public static final AllowableValue MANAGED_BEGINNING_VALUE = new AllowableValue("Managed from Beginning", "Managed from Beginning",
+            "The processor will manage the date ranges of the query starting from the beginning of time.");
+    public static final AllowableValue MANAGED_CURRENT_VALUE = new AllowableValue("Managed from Current", "Managed from Current",
+            "The processor will manage the date ranges of the query starting from the current time.");
+    public static final AllowableValue PROVIDED_VALUE = new AllowableValue("Provided", "Provided",
+            "The the time range provided through the Earliest Time and Latest Time properties will be used.");
+
+    public static final PropertyDescriptor TIME_RANGE_STRATEGY = new PropertyDescriptor.Builder()
+            .name("Time Range Strategy")
+            .description("Indicates how to apply time ranges to each execution of the query. Selecting a managed option " +
+                    "allows the processor to apply a time range from the last execution time to the current execution time. " +
+                    "When using <Managed from Beginning>, an earliest time will not be applied on the first execution, and thus all " +
+                    "records searched. When using <Managed from Current> the earliest time of the first execution will be the " +
+                    "initial execution time. When using <Provided>, the time range will come from the Earliest Time and Latest Time " +
+                    "properties, or no time range will be applied if these properties are left blank.")
+            .allowableValues(MANAGED_BEGINNING_VALUE, MANAGED_CURRENT_VALUE, PROVIDED_VALUE)
+            .defaultValue(PROVIDED_VALUE.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor EARLIEST_TIME = new PropertyDescriptor.Builder()
+            .name("Earliest Time")
+            .description("The value to use for the earliest time when querying. Only used with a Time Range Strategy of Provided. " +
+                    "See Splunk's documentation on Search Time Modifiers for guidance in populating this field.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor LATEST_TIME = new PropertyDescriptor.Builder()
+            .name("Latest Time")
+            .description("The value to use for the latest time when querying. Only used with a Time Range Strategy of Provided. " +
+                    "See Splunk's documentation on Search Time Modifiers for guidance in populating this field.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor APP = new PropertyDescriptor.Builder()
+            .name("Application")
+            .description("The Splunk Application to query.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    public static final AllowableValue ATOM_VALUE = new AllowableValue(JobExportArgs.OutputMode.ATOM.name(), JobExportArgs.OutputMode.ATOM.name());
+    public static final AllowableValue CSV_VALUE = new AllowableValue(JobExportArgs.OutputMode.CSV.name(), JobExportArgs.OutputMode.CSV.name());
+    public static final AllowableValue JSON_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON.name(), JobExportArgs.OutputMode.JSON.name());
+    public static final AllowableValue JSON_COLS_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON_COLS.name(), JobExportArgs.OutputMode.JSON_COLS.name());
+    public static final AllowableValue JSON_ROWS_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON_ROWS.name(), JobExportArgs.OutputMode.JSON_ROWS.name());
+    public static final AllowableValue RAW_VALUE = new AllowableValue(JobExportArgs.OutputMode.RAW.name(), JobExportArgs.OutputMode.RAW.name());
+    public static final AllowableValue XML_VALUE = new AllowableValue(JobExportArgs.OutputMode.XML.name(), JobExportArgs.OutputMode.XML.name());
+
+    public static final PropertyDescriptor OUTPUT_MODE = new PropertyDescriptor.Builder()
+            .name("Output Mode")
+            .description("The output mode for the results.")
+            .allowableValues(ATOM_VALUE, CSV_VALUE, JSON_VALUE, JSON_COLS_VALUE, JSON_ROWS_VALUE, RAW_VALUE, XML_VALUE)
+            .defaultValue(JSON_VALUE.getValue())
+            .required(true)
+            .build();
+
+    public static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    public static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    public static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    public static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    public static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Results retrieved from Splunk are sent out this relationship.")
+            .build();
+
+    public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    public static final String EARLIEST_TIME_KEY = "earliestTime";
+    public static final String LATEST_TIME_KEY = "latestTime";
+
+    public static final String QUERY_ATTR = "splunk.query";
+    public static final String EARLIEST_TIME_ATTR = "splunk.earliest.time";
+    public static final String LATEST_TIME_ATTR = "splunk.latest.time";
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+    private volatile String transitUri;
+    private volatile boolean resetState = false;
+    private volatile Service splunkService;
+    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SCHEME);
+        descriptors.add(HOSTNAME);
+        descriptors.add(PORT);
+        descriptors.add(QUERY);
+        descriptors.add(TIME_RANGE_STRATEGY);
+        descriptors.add(EARLIEST_TIME);
+        descriptors.add(LATEST_TIME);
+        descriptors.add(APP);
+        descriptors.add(OWNER);
+        descriptors.add(TOKEN);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(SECURITY_PROTOCOL);
+        descriptors.add(OUTPUT_MODE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+
+        final String scheme = validationContext.getProperty(SCHEME).getValue();
+        final String secProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+
+        if (HTTPS_SCHEME.equals(scheme) && StringUtils.isBlank(secProtocol)) {
+            results.add(new ValidationResult.Builder()
+                    .explanation("Security Protocol must be specified when using HTTPS")
+                    .valid(false).subject("Security Protocol").build());
+        }
+
+        final String username = validationContext.getProperty(USERNAME).getValue();
+        final String password = validationContext.getProperty(PASSWORD).getValue();
+
+        if (!StringUtils.isBlank(username) && StringUtils.isBlank(password)) {
+            results.add(new ValidationResult.Builder()
+                    .explanation("Password must be specified when providing a Username")
+                    .valid(false).subject("Password").build());
+        }
+
+        return results;
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+        if ( ((oldValue != null && !oldValue.equals(newValue)) || (oldValue == null && newValue != null))
+                && (descriptor.equals(QUERY)
+                || descriptor.equals(TIME_RANGE_STRATEGY)
+                || descriptor.equals(EARLIEST_TIME)
+                || descriptor.equals(LATEST_TIME)
+                || descriptor.equals(HOSTNAME))
+                ) {
+            resetState = true;
+        }
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String scheme = context.getProperty(SCHEME).getValue();
+        final String host = context.getProperty(HOSTNAME).getValue();
+        final int port = context.getProperty(PORT).asInteger();
+        transitUri = new StringBuilder().append(scheme).append("://").append(host).append(":").append(port).toString();
+
+        // if properties changed since last execution then remove any previous state
+        if (resetState) {
+            try {
+                context.getStateManager().clear(Scope.CLUSTER);
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to clear state", ioe);
+            }
+            resetState = false;
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        if (splunkService != null) {
+            isInitialized.set(false);
+            splunkService.logout();
+            splunkService = null;
+        }
+    }
+
+    @OnRemoved
+    public void onRemoved(final ProcessContext context) {
+        try {
+            context.getStateManager().clear(Scope.CLUSTER);
+        } catch (IOException e) {
+           getLogger().error("Unable to clear processor state due to {}", new Object[] {e.getMessage()}, e);
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+
+        synchronized (isInitialized) {
+            if (!isInitialized.get()) {
+                splunkService = createSplunkService(context);
+                isInitialized.set(true);
+            }
+        }
+
+        final String query = context.getProperty(QUERY).getValue();
+        final String outputMode = context.getProperty(OUTPUT_MODE).getValue();
+        final String timeRangeStrategy = context.getProperty(TIME_RANGE_STRATEGY).getValue();
+
+        final JobExportArgs exportArgs = new JobExportArgs();
+        exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
+        exportArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode));
+
+        String earliestTime = null;
+        String latestTime = null;
+
+        if (PROVIDED_VALUE.getValue().equals(timeRangeStrategy)) {
+            // for provided we just use the values of the properties
+            earliestTime = context.getProperty(EARLIEST_TIME).getValue();
+            latestTime = context.getProperty(LATEST_TIME).getValue();
+        } else {
+            try {
+                // not provided so we need to check the previous state
+                final TimeRange previousRange = loadState(context.getStateManager());
+                final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
+
+                if (previousRange == null) {
+                    // no previous state so set the earliest time based on the strategy
+                    if (MANAGED_CURRENT_VALUE.getValue().equals(timeRangeStrategy)) {
+                        earliestTime = dateFormat.format(new Date(currentTime));
+                    }
+
+                    // no previous state so set the latest time to the current time
+                    latestTime = dateFormat.format(new Date(currentTime));
+
+                    // if its the first time through don't actually run, just save the state to get the
+                    // initial time saved and next execution will be the first real execution
+                    if (latestTime.equals(earliestTime)) {
+                        saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime));
+                        return;
+                    }
+
+                } else {
+                    // we have previous state so set earliestTime to latestTime of last range
+                    earliestTime = previousRange.getLatestTime();
+                    latestTime = dateFormat.format(new Date(currentTime));
+                }
+
+            } catch (IOException e) {
+                getLogger().error("Unable to load data from State Manager due to {}", new Object[] {e.getMessage()}, e);
+                context.yield();
+                return;
+            }
+        }
+
+        if (!StringUtils.isBlank(earliestTime)) {
+            exportArgs.setEarliestTime(earliestTime);
+        }
+
+        if (!StringUtils.isBlank(latestTime)) {
+            exportArgs.setLatestTime(latestTime);
+        }
+
+        getLogger().debug("Using earliestTime of {} and latestTime of {}", new Object[] {earliestTime, latestTime});
+
+        final InputStream exportSearch = splunkService.export(query, exportArgs);
+
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(OutputStream rawOut) throws IOException {
+                try (BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
+                    IOUtils.copyLarge(exportSearch, out);
+                }
+            }
+        });
+
+        final Map<String,String> attributes = new HashMap<>(3);
+        attributes.put(EARLIEST_TIME_ATTR, earliestTime);
+        attributes.put(LATEST_TIME_ATTR, latestTime);
+        attributes.put(QUERY_ATTR, query);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        session.getProvenanceReporter().receive(flowFile, transitUri);
+        session.transfer(flowFile, REL_SUCCESS);
+        getLogger().debug("Received {} from Splunk", new Object[] {flowFile});
+
+        // save the time range for the next execution to pick up where we left off
+        // if saving fails then roll back the session so we can try again next execution
+        // only need to do this for the managed time strategies
+        if (!PROVIDED_VALUE.getValue().equals(timeRangeStrategy)) {
+            try {
+                saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime));
+            } catch (IOException e) {
+                getLogger().error("Unable to load data from State Manager due to {}", new Object[]{e.getMessage()}, e);
+                session.rollback();
+                context.yield();
+            }
+        }
+    }
+
+    protected Service createSplunkService(final ProcessContext context) {
+        final ServiceArgs serviceArgs = new ServiceArgs();
+
+        final String scheme = context.getProperty(SCHEME).getValue();
+        serviceArgs.setScheme(scheme);
+
+        final String host = context.getProperty(HOSTNAME).getValue();
+        serviceArgs.setHost(host);
+
+        final int port = context.getProperty(PORT).asInteger();
+        serviceArgs.setPort(port);
+
+        final String app = context.getProperty(APP).getValue();
+        if (!StringUtils.isBlank(app)) {
+            serviceArgs.setApp(app);
+        }
+
+        final String owner = context.getProperty(OWNER).getValue();
+        if (!StringUtils.isBlank(owner)) {
+            serviceArgs.setOwner(owner);
+        }
+
+        final String token = context.getProperty(TOKEN).getValue();
+        if (!StringUtils.isBlank(token)) {
+            serviceArgs.setToken(token);
+        }
+
+        final String username = context.getProperty(USERNAME).getValue();
+        if (!StringUtils.isBlank(username)) {
+            serviceArgs.setUsername(username);
+        }
+
+        final String password = context.getProperty(PASSWORD).getValue();
+        if (!StringUtils.isBlank(password)) {
+            serviceArgs.setPassword(password);
+        }
+
+        final String secProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        if (!StringUtils.isBlank(secProtocol) && HTTPS_SCHEME.equals(scheme)) {
+            serviceArgs.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(secProtocol));
+        }
+
+        return Service.connect(serviceArgs);
+    }
+
+    private void saveState(StateManager stateManager, TimeRange timeRange) throws IOException {
+        final String earliest = StringUtils.isBlank(timeRange.getEarliestTime()) ? "" : timeRange.getEarliestTime();
+        final String latest = StringUtils.isBlank(timeRange.getLatestTime()) ? "" : timeRange.getLatestTime();
+
+        Map<String,String> state = new HashMap<>(2);
+        state.put(EARLIEST_TIME_KEY, earliest);
+        state.put(LATEST_TIME_KEY, latest);
+
+        getLogger().debug("Saving state with earliestTime of {} and latestTime of {}", new Object[] {earliest, latest});
+        stateManager.setState(state, Scope.CLUSTER);
+    }
+
+    private TimeRange loadState(StateManager stateManager) throws IOException {
+        final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
+
+        if (stateMap.getVersion() < 0) {
+            getLogger().debug("No previous state found");
+            return null;
+        }
+
+        final String earliest = stateMap.get(EARLIEST_TIME_KEY);
+        final String latest = stateMap.get(LATEST_TIME_KEY);
+        getLogger().debug("Loaded state with earliestTime of {} and latestTime of {}", new Object[] {earliest, latest});
+
+        if (StringUtils.isBlank(earliest) && StringUtils.isBlank(latest)) {
+            return null;
+        } else {
+            return new TimeRange(earliest, latest);
+        }
+    }
+
+    static class TimeRange {
+
+        final String earliestTime;
+        final String latestTime;
+
+        public TimeRange(String earliestTime, String latestTime) {
+            this.earliestTime = earliestTime;
+            this.latestTime = latestTime;
+        }
+
+        public String getEarliestTime() {
+            return earliestTime;
+        }
+
+        public String getLatestTime() {
+            return latestTime;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
new file mode 100644
index 0000000..482b85d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+import org.apache.nifi.util.LongHolder;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "tcp", "udp"})
+@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed
+@CapabilityDescription("Sends logs to Splunk Enterprise over TCP, TCP + TLS/SSL, or UDP. If a Message " +
+        "Delimiter is provided, then this processor will read messages from the incoming FlowFile based on the " +
+        "delimiter, and send each message to Splunk. If a Message Delimiter is not provided then the content of " +
+        "the FlowFile will be sent directly to Splunk as if it were a single message.")
+public class PutSplunk extends AbstractPutEventProcessor {
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
+                    "messages will be sent over a secure connection.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final char NEW_LINE_CHAR = '\n';
+
+    @Override
+    protected List<PropertyDescriptor> getAdditionalProperties() {
+        return Arrays.asList(
+                PROTOCOL,
+                MESSAGE_DELIMITER,
+                SSL_CONTEXT_SERVICE
+        );
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+
+        final String protocol = context.getProperty(PROTOCOL).getValue();
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
+            results.add(new ValidationResult.Builder()
+                    .explanation("SSL can not be used with UDP")
+                    .valid(false).subject("SSL Context").build());
+        }
+
+        return results;
+    }
+
+    @OnStopped
+    public void cleanup() {
+        for (final FlowFileMessageBatch batch : activeBatches) {
+            batch.cancelOrComplete();
+        }
+
+        FlowFileMessageBatch batch;
+        while ((batch = completeBatches.poll()) != null) {
+            batch.completeSession();
+        }
+    }
+
+    @Override
+    protected String createTransitUri(ProcessContext context) {
+        final String port = context.getProperty(PORT).getValue();
+        final String host = context.getProperty(HOSTNAME).getValue();
+        final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
+        return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
+    }
+
+    @Override
+    protected ChannelSender createSender(ProcessContext context) throws IOException {
+        final int port = context.getProperty(PORT).asInteger();
+        final String host = context.getProperty(HOSTNAME).getValue();
+        final String protocol = context.getProperty(PROTOCOL).getValue();
+        final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        SSLContext sslContext = null;
+        if (sslContextService != null) {
+            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+        }
+
+        return createSender(protocol, host, port, timeout, maxSendBuffer, sslContext);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        // first complete any batches from previous executions
+        FlowFileMessageBatch batch;
+        while ((batch = completeBatches.poll()) != null) {
+            batch.completeSession();
+        }
+
+        // create a session and try to get a FlowFile, if none available then close any idle senders
+        final ProcessSession session = sessionFactory.createSession();
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+            context.yield();
+            return;
+        }
+
+        // get a sender from the pool, or create a new one if the pool is empty
+        // if we can't create a new connection then route flow files to failure and yield
+        ChannelSender sender = senderPool.poll();
+        if (sender == null) {
+            try {
+                getLogger().debug("No available connections, creating a new one...");
+                sender = createSender(context);
+            } catch (IOException e) {
+                getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
+                        new Object[]{flowFile}, e);
+                session.transfer(flowFile, REL_FAILURE);
+                context.yield();
+                return;
+            }
+        }
+
+        try {
+            String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+            if (delimiter != null) {
+                delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+            }
+
+            // if no delimiter then treat the whole FlowFile as a single message
+            if (delimiter == null) {
+                processSingleMessage(context, session, flowFile, sender);
+            } else {
+                processDelimitedMessages(context, session, flowFile, sender, delimiter);
+            }
+
+        } finally {
+            // if the connection is still open and no IO errors happened then try to return, if pool is full then close
+            if (sender.isConnected()) {
+                boolean returned = senderPool.offer(sender);
+                if (!returned) {
+                    sender.close();
+                }
+            } else {
+                // probably already closed here, but quietly close anyway to be safe
+                sender.close();
+            }
+
+        }
+    }
+
+    /**
+     * Send the entire FlowFile as a single message.
+     */
+    private void processSingleMessage(ProcessContext context, ProcessSession session, FlowFile flowFile, ChannelSender sender) {
+        // copy the contents of the FlowFile to the ByteArrayOutputStream
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1);
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.copy(in, baos);
+            }
+        });
+
+        // if TCP and we don't end in a new line then add one
+        final String protocol = context.getProperty(PROTOCOL).getValue();
+        if (protocol.equals(TCP_VALUE.getValue())) {
+            final byte[] buf = baos.getUnderlyingBuffer();
+            if (buf[baos.size() - 1] != NEW_LINE_CHAR) {
+                baos.write(NEW_LINE_CHAR);
+            }
+        }
+
+        // create a message batch of one message and add to active batches
+        final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile);
+        messageBatch.setNumMessages(1);
+        activeBatches.add(messageBatch);
+
+        // attempt to send the data and add the appropriate range
+        try {
+            sender.send(baos.toByteArray());
+            messageBatch.addSuccessfulRange(0L, flowFile.getSize());
+        } catch (IOException e) {
+            messageBatch.addFailedRange(0L, flowFile.getSize(), e);
+            context.yield();
+        }
+    }
+
+    /**
+     * Read delimited messages from the FlowFile tracking which messages are sent successfully.
+     */
+    private void processDelimitedMessages(final ProcessContext context, final ProcessSession session, final FlowFile flowFile,
+                                          final ChannelSender sender, final String delimiter) {
+
+        final String protocol = context.getProperty(PROTOCOL).getValue();
+        final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
+
+        // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see if it matches
+        // some pattern. We can use this to search for the delimiter as we read through the stream of bytes in the FlowFile
+        final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
+
+        final LongHolder messagesSent = new LongHolder(0L);
+        final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile);
+        activeBatches.add(messageBatch);
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream rawIn) throws IOException {
+                    byte[] data = null; // contents of a single message
+                    boolean streamFinished = false;
+
+                    int nextByte;
+                    try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
+                         final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
+
+                        long messageStartOffset = in.getBytesConsumed();
+
+                        // read until we're out of data.
+                        while (!streamFinished) {
+                            nextByte = in.read();
+
+                            if (nextByte > -1) {
+                                baos.write(nextByte);
+                            }
+
+                            if (nextByte == -1) {
+                                // we ran out of data. This message is complete.
+                                data = getMessage(baos, baos.size(), protocol);
+                                streamFinished = true;
+                            } else if (buffer.addAndCompare((byte) nextByte)) {
+                                // we matched our delimiter. This message is complete. We want all of the bytes from the
+                                // underlying BAOS except for the last 'delimiterBytes.length' bytes because we don't want
+                                // the delimiter itself to be sent.
+                                data = getMessage(baos, baos.size() - delimiterBytes.length, protocol);
+                            }
+
+                            if (data != null) {
+                                final long messageEndOffset = in.getBytesConsumed();
+
+                                // If the message has no data, ignore it.
+                                if (data.length != 0) {
+                                    final long rangeStart = messageStartOffset;
+                                    try {
+                                        sender.send(data);
+                                        messageBatch.addSuccessfulRange(rangeStart, messageEndOffset);
+                                        messagesSent.incrementAndGet();
+                                    } catch (final IOException e) {
+                                        messageBatch.addFailedRange(rangeStart, messageEndOffset, e);
+                                    }
+                                }
+
+                                // reset BAOS so that we can start a new message.
+                                baos.reset();
+                                data = null;
+                                messageStartOffset = in.getBytesConsumed();
+                            }
+                        }
+                    }
+                }
+            });
+
+            messageBatch.setNumMessages(messagesSent.get());
+        }
+    }
+
+    /**
+     * Helper to get the bytes of a message from the ByteArrayOutputStream, factoring in whether we need a
+     * a new line at the end of our message.
+     *
+     * @param baos the ByteArrayOutputStream to get data from
+     * @param length the amount of data to copy from the baos
+     * @param protocol the protocol (TCP or UDP)
+     *
+     * @return the bytes from 0 to length, including a new line if the protocol was TCP
+     */
+    private byte[] getMessage(final ByteArrayOutputStream baos, final int length, final String protocol) {
+        if (baos.size() == 0) {
+            return null;
+        }
+
+        final byte[] buf = baos.getUnderlyingBuffer();
+
+        // if TCP and we don't already end with a new line then add one
+        if (protocol.equals(TCP_VALUE.getValue()) && buf[length - 1] != NEW_LINE_CHAR) {
+            byte[] message = new byte[length + 1];
+
+            for (int i=0; i < length; i++) {
+               message[i] = buf[i];
+            }
+            message[message.length - 1] = NEW_LINE_CHAR;
+            return message;
+        } else {
+            return Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, length);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..4612039
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.splunk.GetSplunk
+org.apache.nifi.processors.splunk.PutSplunk
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java
new file mode 100644
index 0000000..42daab6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.JobExportArgs;
+import com.splunk.Service;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestGetSplunk {
+
+    private Service service;
+    private TestableGetSplunk proc;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        service = Mockito.mock(Service.class);
+        proc = new TestableGetSplunk(service);
+
+        runner = TestRunners.newTestRunner(proc);
+    }
+
+    @Test
+    public void testCustomValidation() {
+        final String query = "search tcp:7879";
+        final String providedEarliest = "-1h";
+        final String providedLatest = "now";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest);
+        runner.setProperty(GetSplunk.LATEST_TIME, providedLatest);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+        runner.assertValid();
+
+        runner.setProperty(GetSplunk.USERNAME, "user1");
+        runner.assertNotValid();
+
+        runner.setProperty(GetSplunk.PASSWORD, "password");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testGetWithProvidedTime() {
+        final String query = "search tcp:7879";
+        final String providedEarliest = "-1h";
+        final String providedLatest = "now";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest);
+        runner.setProperty(GetSplunk.LATEST_TIME, providedLatest);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+
+        final JobExportArgs expectedArgs = new JobExportArgs();
+        expectedArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
+        expectedArgs.setEarliestTime(providedEarliest);
+        expectedArgs.setLatestTime(providedLatest);
+        expectedArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode));
+
+        final String resultContent = "fake results";
+        final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
+        when(service.export(eq(query), argThat(new JobExportArgsMatcher(expectedArgs)))).thenReturn(input);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetSplunk.REL_SUCCESS);
+        Assert.assertEquals(1, mockFlowFiles.size());
+
+        final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
+        mockFlowFile.assertContentEquals(resultContent);
+        mockFlowFile.assertAttributeEquals(GetSplunk.QUERY_ATTR, query);
+        mockFlowFile.assertAttributeEquals(GetSplunk.EARLIEST_TIME_ATTR, providedEarliest);
+        mockFlowFile.assertAttributeEquals(GetSplunk.LATEST_TIME_ATTR, providedLatest);
+        Assert.assertEquals(1, proc.count);
+
+        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+        Assert.assertEquals(1, events.size());
+        Assert.assertEquals(ProvenanceEventType.RECEIVE, events.get(0).getEventType());
+        Assert.assertEquals("https://localhost:8089", events.get(0).getTransitUri());
+    }
+
+    @Test
+    public void testMultipleIterationsWithoutShuttingDown() {
+        final String query = "search tcp:7879";
+        final String providedEarliest = "-1h";
+        final String providedLatest = "now";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest);
+        runner.setProperty(GetSplunk.LATEST_TIME, providedLatest);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+
+        final JobExportArgs expectedArgs = new JobExportArgs();
+        expectedArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
+        expectedArgs.setEarliestTime(providedEarliest);
+        expectedArgs.setLatestTime(providedLatest);
+        expectedArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode));
+
+        final String resultContent = "fake results";
+        final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
+        when(service.export(eq(query), argThat(new JobExportArgsMatcher(expectedArgs)))).thenReturn(input);
+
+        final int iterations = 3;
+        runner.run(iterations, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, iterations);
+        Assert.assertEquals(1, proc.count);
+    }
+
+    @Test
+    public void testGetWithManagedFromBeginning() {
+        final String query = "search tcp:7879";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+        runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue());
+
+        final String resultContent = "fake results";
+        final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
+        when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
+
+        // run once and don't shut down
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
+
+        // capture what the args were on last run
+        final ArgumentCaptor<JobExportArgs> capture1 = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(1)).export(eq(query), capture1.capture());
+
+        // first execution with no previous state and "managed from beginning" should have a latest time and no earliest time
+        final JobExportArgs actualArgs1 = capture1.getValue();
+        Assert.assertNotNull(actualArgs1);
+        Assert.assertNull(actualArgs1.get("earliest_time"));
+        Assert.assertNotNull(actualArgs1.get("latest_time"));
+
+        // save the latest time from the first run which should be earliest time of next run
+        final String expectedLatest = (String) actualArgs1.get("latest_time");
+
+        // run again
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2);
+
+        final ArgumentCaptor<JobExportArgs> capture2 = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(2)).export(eq(query), capture2.capture());
+
+        // second execution the earliest time should be the previous latest_time
+        final JobExportArgs actualArgs2 = capture2.getValue();
+        Assert.assertNotNull(actualArgs2);
+        Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time"));
+        Assert.assertNotNull(actualArgs2.get("latest_time"));
+    }
+
+    @Test
+    public void testGetWithManagedFromCurrent() throws IOException {
+        final String query = "search tcp:7879";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+        runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_CURRENT_VALUE.getValue());
+
+        final String resultContent = "fake results";
+        final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
+        when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
+
+        // run once and don't shut down, shouldn't produce any results first time
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 0);
+
+        // capture what the args were on last run
+        verify(service, times(0)).export(eq(query), any(JobExportArgs.class));
+
+        final StateMap state = runner.getStateManager().getState(Scope.CLUSTER);
+        Assert.assertNotNull(state);
+        Assert.assertTrue(state.getVersion() > 0);
+
+        // save the latest time from the first run which should be earliest time of next run
+        final String expectedLatest = state.get(GetSplunk.LATEST_TIME_KEY);
+
+        // run again
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
+
+        final ArgumentCaptor<JobExportArgs> capture = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(1)).export(eq(query), capture.capture());
+
+        // second execution the earliest time should be the previous latest_time
+        final JobExportArgs actualArgs = capture.getValue();
+        Assert.assertNotNull(actualArgs);
+        Assert.assertEquals(expectedLatest, actualArgs.get("earliest_time"));
+        Assert.assertNotNull(actualArgs.get("latest_time"));
+    }
+
+
+    /**
+     * Testable implementation of GetSplunk to return a Mock Splunk Service.
+     */
+    private static class TestableGetSplunk extends GetSplunk {
+
+        int count;
+        Service mockService;
+
+        public TestableGetSplunk(Service mockService) {
+            this.mockService = mockService;
+        }
+
+        @Override
+        protected Service createSplunkService(ProcessContext context) {
+            count++;
+            return mockService;
+        }
+    }
+
+    /**
+     * Custom args matcher for JobExportArgs.
+     */
+    private static class JobExportArgsMatcher extends ArgumentMatcher<JobExportArgs> {
+
+        private JobExportArgs expected;
+
+        public JobExportArgsMatcher(JobExportArgs expected) {
+            this.expected = expected;
+        }
+
+        @Override
+        public boolean matches(Object o) {
+            if (o == null) {
+                return false;
+            }
+
+            if (!(o instanceof JobExportArgs)) {
+                return false;
+            }
+
+            JobExportArgs other = (JobExportArgs) o;
+            return expected.equals(other);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
new file mode 100644
index 0000000..bb55372
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class TestPutSplunk {
+
+    private TestRunner runner;
+    private TestablePutSplunk proc;
+    private CapturingChannelSender sender;
+
+    @Before
+    public void init() {
+        ProcessorLog logger = Mockito.mock(ProcessorLog.class);
+        sender = new CapturingChannelSender("localhost", 12345, 0, logger);
+        proc = new TestablePutSplunk(sender);
+
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSplunk.PORT, "12345");
+    }
+
+    @Test
+    public void testUDPSendWholeFlowFile() {
+        final String message = "This is one message, should send the whole FlowFile";
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(1, sender.getMessages().size());
+        Assert.assertEquals(message, sender.getMessages().get(0));
+    }
+
+    @Test
+    public void testTCPSendWholeFlowFile() {
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        final String message = "This is one message, should send the whole FlowFile";
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(1, sender.getMessages().size());
+        Assert.assertEquals(message + "\n", sender.getMessages().get(0));
+    }
+
+    @Test
+    public void testTCPSendWholeFlowFileAlreadyHasNewLine() {
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        final String message = "This is one message, should send the whole FlowFile\n";
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(1, sender.getMessages().size());
+        Assert.assertEquals(message, sender.getMessages().get(0));
+    }
+
+    @Test
+    public void testUDPSendDelimitedMessages() {
+        final String delimiter = "DD";
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
+
+        final String message = "This is message 1DDThis is message 2DDThis is message 3";
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(3, sender.getMessages().size());
+        Assert.assertEquals("This is message 1", sender.getMessages().get(0));
+        Assert.assertEquals("This is message 2", sender.getMessages().get(1));
+        Assert.assertEquals("This is message 3", sender.getMessages().get(2));
+    }
+
+    @Test
+    public void testTCPSendDelimitedMessages() {
+        final String delimiter = "DD";
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        // no delimiter at end
+        final String message = "This is message 1DDThis is message 2DDThis is message 3";
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(3, sender.getMessages().size());
+        Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
+        Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
+        Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
+    }
+
+    @Test
+    public void testTCPSendDelimitedMessagesWithEL() {
+        final String delimiter = "DD";
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, "${flow.file.delim}");
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        // no delimiter at end
+        final String message = "This is message 1DDThis is message 2DDThis is message 3";
+
+        final Map<String,String> attrs = new HashMap<>();
+        attrs.put("flow.file.delim", delimiter);
+
+        runner.enqueue(message, attrs);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(3, sender.getMessages().size());
+        Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
+        Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
+        Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
+    }
+
+    @Test
+    public void testTCPSendDelimitedMessagesEndsWithDelimiter() {
+        final String delimiter = "DD";
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        // delimiter at end
+        final String message = "This is message 1DDThis is message 2DDThis is message 3DD";
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(3, sender.getMessages().size());
+        Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
+        Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
+        Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
+    }
+
+    @Test
+    public void testTCPSendDelimitedMessagesWithNewLineDelimiter() {
+        final String delimiter = "\\n";
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        final String message = "This is message 1\nThis is message 2\nThis is message 3";
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(3, sender.getMessages().size());
+        Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
+        Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
+        Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
+    }
+
+    @Test
+    public void testTCPSendDelimitedMessagesWithErrors() {
+        sender.setErrorStart(3);
+        sender.setErrorEnd(4);
+
+        final String delimiter = "DD";
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        // no delimiter at end
+        final String success = "This is message 1DDThis is message 2DD";
+        final String failure = "This is message 3DDThis is message 4";
+        final String message = success + failure;
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertTransferCount(PutSplunk.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
+
+        // first two messages should went out success
+        final MockFlowFile successFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        successFlowFile.assertContentEquals(success);
+
+        // second two messages should went to failure
+        final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
+        failureFlowFile.assertContentEquals(failure);
+
+        // should only have the first two messages
+        Assert.assertEquals(2, sender.getMessages().size());
+        Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
+        Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
+    }
+
+    @Test
+    public void testTCPSendDelimitedMessagesWithErrorsInMiddle() {
+        sender.setErrorStart(3);
+        sender.setErrorEnd(4);
+
+        final String delimiter = "DD";
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+
+        // no delimiter at end
+        final String success = "This is message 1DDThis is message 2DD";
+        final String failure = "This is message 3DDThis is message 4DD";
+        final String success2 = "This is message 5DDThis is message 6DDThis is message 7DD";
+        final String message = success + failure + success2;
+
+        runner.enqueue(message);
+        runner.run(1);
+        runner.assertTransferCount(PutSplunk.REL_SUCCESS, 2);
+        runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
+
+        // first two messages should have went out success
+        final MockFlowFile successFlowFile1 = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        successFlowFile1.assertContentEquals(success);
+
+        // last three messages should have went out success
+        final MockFlowFile successFlowFile2 = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(1);
+        successFlowFile2.assertContentEquals(success2);
+
+        // second two messages should have went to failure
+        final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
+        failureFlowFile.assertContentEquals(failure);
+
+        // should only have the first two messages
+        Assert.assertEquals(5, sender.getMessages().size());
+        Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
+        Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
+        Assert.assertEquals("This is message 5\n", sender.getMessages().get(2));
+        Assert.assertEquals("This is message 6\n", sender.getMessages().get(3));
+        Assert.assertEquals("This is message 7\n", sender.getMessages().get(4));
+    }
+
+    @Test
+    public void testCompletingPreviousBatchOnNextExecution() {
+        final String message = "This is one message, should send the whole FlowFile";
+
+        runner.enqueue(message);
+        runner.run(2, false); // don't shutdown to prove that next onTrigger complete previous batch
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
+
+        final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        Assert.assertEquals(1, sender.getMessages().size());
+        Assert.assertEquals(message, sender.getMessages().get(0));
+    }
+
+    /**
+     * Extend PutSplunk to use a CapturingChannelSender.
+     */
+    private static class TestablePutSplunk extends PutSplunk {
+
+        private ChannelSender sender;
+
+        public TestablePutSplunk(ChannelSender channelSender) {
+            this.sender = channelSender;
+        }
+
+        @Override
+        protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
+            return sender;
+        }
+    }
+
+    /**
+     * A ChannelSender that captures each message that was sent.
+     */
+    private static class CapturingChannelSender extends ChannelSender {
+
+        private List<String> messages = new ArrayList<>();
+        private int count = 0;
+        private int errorStart = -1;
+        private int errorEnd = -1;
+
+        public CapturingChannelSender(String host, int port, int maxSendBufferSize, ProcessorLog logger) {
+            super(host, port, maxSendBufferSize, logger);
+        }
+
+        @Override
+        public void open() throws IOException {
+
+        }
+
+        @Override
+        protected void write(byte[] data) throws IOException {
+            count++;
+            if (errorStart > 0 && count >= errorStart && errorEnd > 0 && count <= errorEnd) {
+                throw new IOException("this is an error");
+            }
+            messages.add(new String(data, StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public boolean isConnected() {
+            return false;
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        public List<String> getMessages() {
+            return messages;
+        }
+
+        public void setErrorStart(int errorStart) {
+            this.errorStart = errorStart;
+        }
+
+        public void setErrorEnd(int errorEnd) {
+            this.errorEnd = errorEnd;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java
new file mode 100644
index 0000000..309893b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk.util;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class LogGenerator {
+
+    static final String LOG_MESSAGE = "This is log message # %s";
+
+    private final int numLogs;
+    private final String delimiter;
+
+    public LogGenerator(int numLogs, String delimiter) {
+        this.numLogs = numLogs;
+        this.delimiter = delimiter;
+    }
+
+    public void generate(final File file) throws IOException {
+        try (OutputStream rawOut = new FileOutputStream(file);
+             BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
+
+            for (int i = 0; i < numLogs; i++) {
+                if (i > 0) {
+                    out.write(delimiter.getBytes(StandardCharsets.UTF_8));
+                }
+
+                final String message = String.format(LOG_MESSAGE, i);
+                out.write(message.getBytes(StandardCharsets.UTF_8));
+            }
+
+            System.out.println("Done generating " + numLogs + " messages");
+            out.flush();
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        if (args == null || args.length != 3) {
+            System.err.println("USAGE: LogGenerator <num_logs> <delimiter> <file>");
+            System.exit(1);
+        }
+
+        final int numLogs = Integer.parseInt(args[0]);
+        final String delim = args[1];
+
+        final File file = new File(args[2]);
+        if (file.exists()) {
+            file.delete();
+        }
+
+        final LogGenerator generator = new LogGenerator(numLogs, delim);
+        generator.generate(file);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/pom.xml b/nifi-nar-bundles/nifi-splunk-bundle/pom.xml
new file mode 100644
index 0000000..daab02f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-splunk-bundle</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-splunk-processors</module>
+        <module>nifi-splunk-nar</module>
+    </modules>
+
+    <repositories>
+        <repository>
+            <id>splunk</id>
+            <name>Splunk Artifactory</name>
+            <url>http://splunk.artifactoryonline.com/splunk/ext-releases-local/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.splunk</groupId>
+                <artifactId>splunk</artifactId>
+                <version>1.5.0.0</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index e51ba6c..a8a6bef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -49,6 +49,14 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor {
             .defaultValue("UTF-8")
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Timeout")
+            .description("The timeout for connecting to and communicating with the syslog server. Does not apply to UDP")
+            .required(false)
+            .defaultValue("10 seconds")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
 
 
 }


Mime
View raw message