flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/2] flink git commit: [FLINK-2740] Adding flink-connector-nifi module with NiFiSource and NiFiSink
Date Mon, 05 Oct 2015 01:55:12 GMT
[FLINK-2740] Adding flink-connector-nifi module with NiFiSource and NiFiSink

This closes #1198


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92fb06a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92fb06a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92fb06a1

Branch: refs/heads/master
Commit: 92fb06a1384cec0e659fcb7da4cd4c88394b27b1
Parents: 5466824
Author: Bryan Bende <bbende@HW11977.home>
Authored: Wed Sep 30 09:15:44 2015 -0400
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Sun Oct 4 19:17:09 2015 -0500

----------------------------------------------------------------------
 .../flink-connector-nifi/pom.xml                |  94 ++++++++++++
 .../connectors/nifi/NiFiDataPacket.java         |  39 +++++
 .../connectors/nifi/NiFiDataPacketBuilder.java  |  34 +++++
 .../streaming/connectors/nifi/NiFiSink.java     |  74 ++++++++++
 .../streaming/connectors/nifi/NiFiSource.java   | 146 +++++++++++++++++++
 .../connectors/nifi/StandardNiFiDataPacket.java |  46 ++++++
 .../nifi/examples/NiFiSinkTopologyExample.java  |  55 +++++++
 .../examples/NiFiSourceTopologyExample.java     |  58 ++++++++
 .../src/test/resources/NiFi_Flink.xml           |  16 ++
 .../flink-streaming-connectors/pom.xml          |   1 +
 10 files changed, 563 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
new file mode 100644
index 0000000..9168822
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-nifi</artifactId>
+	<name>flink-connector-nifi</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<nifi.version>0.3.0</nifi.version>
+	</properties>
+
+	<dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+            <version>${nifi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<rerunFailingTestsCount>3</rerunFailingTestsCount>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<rerunFailingTestsCount>3</rerunFailingTestsCount>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
new file mode 100644
index 0000000..c8ceb57
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
+ * a FlowFile's content and its attributes so that they can be processed by Flink.
+ * </p>
+ */
+public interface NiFiDataPacket {
+
+	/**
+	 * @return the contents of a NiFi FlowFile
+	 */
+	byte[] getContent();
+
+	/**
+	 * @return a Map of attributes that are associated with the NiFi FlowFile
+	 */
+	Map<String, String> getAttributes();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..9bb521b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * A function that can create a NiFiDataPacket from an incoming instance of the given type.
+ *
+ * @param <T>
+ */
+public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
+
+	NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
new file mode 100644
index 0000000..abc6b35
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+/**
+ * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires
+ * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data.
+ */
+public class NiFiSink<T> extends RichSinkFunction<T> {
+
+	private SiteToSiteClient client;
+	private SiteToSiteClientConfig clientConfig;
+	private NiFiDataPacketBuilder<T> builder;
+
+	/**
+	 * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 * @param builder a builder to produce NiFiDataPackets from incoming data
+	 */
+	public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder) {
+		this.clientConfig = clientConfig;
+		this.builder = builder;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		final NiFiDataPacket niFiDataPacket = builder.createNiFiDataPacket(value, getRuntimeContext());
+
+		final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+		if (transaction == null) {
+			throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
+		}
+
+		transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes());
+		transaction.confirm();
+		transaction.complete();
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		client.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
new file mode 100644
index 0000000..a213bb4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
+ * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
+ */
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
+
+	private static final long DEFAULT_WAIT_TIME_MS = 1000;
+
+	private long waitTimeMs;
+	private SiteToSiteClient client;
+	private SiteToSiteClientConfig clientConfig;
+	private transient volatile boolean running;
+
+	/**
+	 * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 */
+	public NiFiSource(SiteToSiteClientConfig clientConfig) {
+		this(clientConfig, DEFAULT_WAIT_TIME_MS);
+	}
+
+	/**
+	 * Constructs a new NiFiSource using the given client config and wait time.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
+	 */
+	public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
+		this.clientConfig = clientConfig;
+		this.waitTimeMs = waitTimeMs;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+		running = true;
+	}
+
+	@Override
+	public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
+		try {
+			while (running) {
+				final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+				if (transaction == null) {
+					LOG.warn("A transaction could not be created, waiting and will try again...");
+					try {
+						Thread.sleep(waitTimeMs);
+					} catch (InterruptedException e) {
+
+					}
+					continue;
+				}
+
+				DataPacket dataPacket = transaction.receive();
+				if (dataPacket == null) {
+					transaction.confirm();
+					transaction.complete();
+
+					LOG.debug("No data available to pull, waiting and will try again...");
+					try {
+						Thread.sleep(waitTimeMs);
+					} catch (InterruptedException e) {
+
+					}
+					continue;
+				}
+
+				final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
+				do {
+					// Read the data into a byte array and wrap it along with the attributes
+					// into a NiFiDataPacket.
+					final InputStream inStream = dataPacket.getData();
+					final byte[] data = new byte[(int) dataPacket.getSize()];
+					StreamUtils.fillBuffer(inStream, data);
+
+					final Map<String, String> attributes = dataPacket.getAttributes();
+
+					niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
+					dataPacket = transaction.receive();
+				} while (dataPacket != null);
+
+				// Confirm transaction to verify the data
+				transaction.confirm();
+
+				for (NiFiDataPacket dp : niFiDataPackets) {
+					ctx.collect(dp);
+				}
+
+				transaction.complete();
+			}
+		} finally {
+			ctx.close();
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		client.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..5ad4bae
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An implementation of NiFiDataPacket.
+ */
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
+	private static final long serialVersionUID = 6364005260220243322L;
+
+	private final byte[] content;
+	private final Map<String, String> attributes;
+
+	public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
+		this.content = content;
+		this.attributes = attributes;
+	}
+
+	@Override
+	public byte[] getContent() {
+		return content;
+	}
+
+	@Override
+	public Map<String, String> getAttributes() {
+		return attributes;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
new file mode 100644
index 0000000..572f949
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
+import org.apache.flink.streaming.connectors.nifi.NiFiSink;
+import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.util.HashMap;
+
+/**
+ * An example topology that sends data to a NiFi input port named "Data from Flink".
+ */
+public class NiFiSinkTopologyExample {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+				.url("http://localhost:8080/nifi")
+				.portName("Data from Flink")
+				.buildConfig();
+
+		DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q")
+				.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
+					@Override
+					public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
+						return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+					}
+				}));
+
+		env.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
new file mode 100644
index 0000000..79c9a1c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiSource;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.nio.charset.Charset;
+
+/**
+ * An example topology that pulls data from a NiFi output port named "Data for Flink".
+ */
+public class NiFiSourceTopologyExample {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+				.url("http://localhost:8080/nifi")
+				.portName("Data for Flink")
+				.requestBatchCount(5)
+				.buildConfig();
+
+		SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
+		DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
+
+		DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
+			@Override
+			public String map(NiFiDataPacket value) throws Exception {
+				return new String(value.getContent(), Charset.defaultCharset());
+			}
+		});
+
+		dataStream.print();
+		env.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
new file mode 100644
index 0000000..d373d63
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>769242e5-ee04-4656-a684-ca661a18eed6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>59574e3b-1ba7-4343-b265-af1b67923a85</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThresh
 old>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>48042218-a51e-45c7-bd30-2290bba8b191</id><type>OUTPUT_PORT</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>46c9343f-f732-4e2d-98e1-13caab5d2f5e</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><source><groupI
 d>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><type>INPUT_PORT</type></source><zIndex>0</zIndex></connections><inputPorts><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>395.0</x><y>520.0</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data from Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>INPUT_PORT</type></inputPorts><outputPorts><id>48042218-a51e-45c7-bd30-2290bba8b191</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1616.0</x><y>259.0</y></position><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data for Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>OUTPUT_PORT</type></outputPorts><processors><id>769242e5-ee04-4656-a684-ca661a18eed6</id><parentGroupId>0f854f2b-239f-45f0-bfed-48
 b5b23f7928</parentGroupId><position><x>389.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>File Size</key><value><description>The size of the file that will be used</description><displayName>File Size</displayName><dynamic>false</dynamic><name>File Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Batch Size</key><value><defaultValue>1</defaultValue><description>The number of FlowFiles to be transferr
 ed in each invocation</description><displayName>Batch Size</displayName><dynamic>false</dynamic><name>Batch Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Data Format</key><value><allowableValues><displayName>Binary</displayName><value>Binary</value></allowableValues><allowableValues><displayName>Text</displayName><value>Text</value></allowableValues><defaultValue>Binary</defaultValue><description>Specifies whether the data should be Text or Binary</description><displayName>Data Format</displayName><dynamic>false</dynamic><name>Data Format</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Unique FlowFiles</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, ea
 ch FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput</description><displayName>Unique FlowFiles</displayName><dynamic>false</dynamic><name>Unique FlowFiles</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>1 b</value></entry><entry><key>Batch Size</key><value>1</value></entry><entry><key>Data Format</key><value>Binary</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>2 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><description></des
 cription><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>826.0</x><y>499.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log Level</key><value><al
 lowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The Log Level to use when logging the Attributes</description><displayName>Log Level</displayName><dynamic>false</dynamic><name>Log Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, the FlowFile's p
 ayload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.</description><displayName>Log Payload</displayName><dynamic>false</dynamic><name>Log Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Log</key><value><description>A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.</description><displayName>Attributes to Log</displayName><dynamic>false</dynamic><name>Attributes to Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Ignore</key><value><description>A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.</description><displayName>Attributes to Ignore</displayName><dynamic>false</dynamic><name>Attributes to Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>
 false</supportsEl></value></entry><entry><key>Log prefix</key><value><description>Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.</description><displayName>Log prefix</displayName><dynamic>false</dynamic><name>Log prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key><value>true</value></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All FlowFil
 es are routed to this relationship</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1000.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><e
 ntry><key>Regular Expression</key><value><defaultValue>(?s:^.*$)</defaultValue><description>The Regular Expression to search for in the FlowFile content</description><displayName>Regular Expression</displayName><dynamic>false</dynamic><name>Regular Expression</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Replacement Value</key><value><defaultValue>$1</defaultValue><description>The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.</description><displayName>Replacement Value</displayName><dynamic>false</dynamic><name>Replacement Value</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Character Set</key><value><defaultValue>UTF-8</defaultValue><description>The
  Character Set in which the file is encoded</description><displayName>Character Set</displayName><dynamic>false</dynamic><name>Character Set</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Maximum Buffer Size</key><value><defaultValue>1 MB</defaultValue><description>Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'</description><displayName>Maximum Buffer 
 Size</displayName><dynamic>false</dynamic><name>Maximum Buffer Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Evaluation Mode</key><value><allowableValues><displayName>Line-by-Line</displayName><value>Line-by-Line</value></allowableValues><allowableValues><displayName>Entire text</displayName><value>Entire text</value></allowableValues><defaultValue>Entire text</defaultValue><description>Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.</description><displayName>Evaluation Mode</displayName><dynamic>false</dynamic><name>Evaluation Mode</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>(?s:^.*$)</value><
 /entry><entry><key>Replacement Value</key><value>blah blah</value></entry><entry><key>Character Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer Size</key><value>1 MB</value></entry><entry><key>Evaluation Mode</key><value>Entire text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles that could not be updated are routed to this relationship</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supports
 EventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.ReplaceText</type></processors></snippet><timestamp>09/30/2015 09:10:38 EDT</timestamp></template>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 142d7c9..822ca26 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -41,6 +41,7 @@ under the License.
 		<module>flink-connector-elasticsearch</module>
 		<module>flink-connector-rabbitmq</module>
 		<module>flink-connector-twitter</module>
+		<module>flink-connector-nifi</module>
 	</modules>
 
 	<!-- See main pom.xml for explanation of profiles -->


Mime
View raw message