flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [08/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package
Date Wed, 21 Oct 2015 09:03:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
new file mode 100644
index 0000000..987e6c5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * Sink function that discards data.
+ * @param <T> The type of the function.
+ */
+public class DiscardingSink<T> implements SinkFunction<T> {
+
+	private static final long serialVersionUID = 2777597566520109843L;
+
+	@Override
+	public void invoke(T value) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
new file mode 100644
index 0000000..5a8ffaa
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+		Checkpointed<Integer>, CheckpointNotifier, Runnable {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
+	
+	private static final long serialVersionUID = 6334389850158707313L;
+	
+	public static volatile boolean failedBefore;
+	public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+	private final int failCount;
+	private int numElementsTotal;
+	private int numElementsThisTime;
+	
+	private boolean failer;
+	private boolean hasBeenCheckpointed;
+	
+	private Thread printer;
+	private volatile boolean printerRunning = true;
+
+	public FailingIdentityMapper(int failCount) {
+		this.failCount = failCount;
+	}
+
+	@Override
+	public void open(Configuration parameters) {
+		failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+		printer = new Thread(this, "FailingIdentityMapper Status Printer");
+		printer.start();
+	}
+
+	@Override
+	public T map(T value) throws Exception {
+		numElementsTotal++;
+		numElementsThisTime++;
+		
+		if (!failedBefore) {
+			Thread.sleep(10);
+			
+			if (failer && numElementsTotal >= failCount) {
+				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+				failedBefore = true;
+				throw new Exception("Artificial Test Failure");
+			}
+		}
+		return value;
+	}
+
+	@Override
+	public void close() throws Exception {
+		printerRunning = false;
+		if (printer != null) {
+			printer.interrupt();
+			printer = null;
+		}
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		this.hasBeenCheckpointed = true;
+	}
+
+	@Override
+	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+		return numElementsTotal;
+	}
+
+	@Override
+	public void restoreState(Integer state) {
+		numElementsTotal = state;
+	}
+
+	@Override
+	public void run() {
+		while (printerRunning) {
+			try {
+				Thread.sleep(5000);
+			}
+			catch (InterruptedException e) {
+				// ignore
+			}
+			LOG.info("============================> Failing mapper  {}: count={}, totalCount={}",
+					getRuntimeContext().getIndexOfThisSubtask(),
+					numElementsThisTime, numElementsTotal);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
new file mode 100644
index 0000000..e94adb5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerCommunicationUtils {
+	
+	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
+	
+	
+	public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
+		
+		// find the jobID
+		Future<Object> listResponse = jobManager.ask(
+				JobManagerMessages.getRequestRunningJobsStatus(),
+				askTimeout);
+
+		List<JobStatusMessage> jobs;
+		try {
+			Object result = Await.result(listResponse, askTimeout);
+			jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+		}
+		catch (Exception e) {
+			throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
+		}
+		
+		if (jobs.isEmpty()) {
+			throw new Exception("Could not cancel job - no running jobs");
+		}
+		if (jobs.size() != 1) {
+			throw new Exception("Could not cancel job - more than one running job.");
+		}
+		
+		JobStatusMessage status = jobs.get(0);
+		if (status.getJobState().isTerminalState()) {
+			throw new Exception("Could not cancel job - job is not running any more");
+		}
+		
+		JobID jobId = status.getJobId();
+		
+		Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
+		try {
+			Await.result(response, askTimeout);
+		}
+		catch (Exception e) {
+			throw new Exception("Sending the 'cancel' message failed.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
new file mode 100644
index 0000000..b9fc3de
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class MockRuntimeContext implements RuntimeContext {
+
+	private final int numberOfParallelSubtasks;
+	private final int indexOfThisSubtask;
+
+	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+		this.indexOfThisSubtask = indexOfThisSubtask;
+	}
+
+
+	@Override
+	public String getTaskName() {
+		return null;
+	}
+
+	@Override
+	public int getNumberOfParallelSubtasks() {
+		return numberOfParallelSubtasks;
+	}
+
+	@Override
+	public int getIndexOfThisSubtask() {
+		return indexOfThisSubtask;
+	}
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public IntCounter getIntCounter(String name) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public LongCounter getLongCounter(String name) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public DoubleCounter getDoubleCounter(String name) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Histogram getHistogram(String name) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <RT> List<RT> getBroadcastVariable(String name) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public DistributedCache getDistributedCache() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
new file mode 100644
index 0000000..e105e01
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
+
+	private static final long serialVersionUID = 1088381231244959088L;
+	
+	/* the partitions from which this function received data */
+	private final Set<Integer> myPartitions = new HashSet<>();
+	
+	private final int numPartitions;
+	private final int maxPartitions;
+
+	public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
+		this.numPartitions = numPartitions;
+		this.maxPartitions = maxPartitions;
+	}
+
+	@Override
+	public Integer map(Integer value) throws Exception {
+		// validate that the partitioning is identical
+		int partition = value % numPartitions;
+		myPartitions.add(partition);
+		if (myPartitions.size() > maxPartitions) {
+			throw new Exception("Error: Elements from too many different partitions: " + myPartitions
+					+ ". Expect elements only from " + maxPartitions + " partitions");
+		}
+		return value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
new file mode 100644
index 0000000..12e3460
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+/**
+ * Exception that is thrown to terminate a program and indicate success.
+ */
+public class SuccessException extends Exception {
+	private static final long serialVersionUID = -7011865671593955887L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
new file mode 100644
index 0000000..1d61229
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * An identity map function that sleeps between elements, throttling the
+ * processing speed.
+ * 
+ * @param <T> The type mapped.
+ */
+public class ThrottledMapper<T> implements MapFunction<T,T> {
+
+	private static final long serialVersionUID = 467008933767159126L;
+
+	private final int sleep;
+
+	public ThrottledMapper(int sleep) {
+		this.sleep = sleep;
+	}
+
+	@Override
+	public T map(T value) throws Exception {
+		Thread.sleep(this.sleep);
+		return value;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
new file mode 100644
index 0000000..b762e21
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+
+import java.io.Serializable;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2Partitioner extends KafkaPartitioner implements Serializable {
+	
+	private static final long serialVersionUID = 1L;
+
+	private final int expectedPartitions;
+
+	
+	public Tuple2Partitioner(int expectedPartitions) {
+		this.expectedPartitions = expectedPartitions;
+	}
+
+	@Override
+	public int partition(Object key, int numPartitions) {
+		if (numPartitions != expectedPartitions) {
+			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+		}
+		@SuppressWarnings("unchecked")
+		Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+		
+		return element.f0;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
new file mode 100644
index 0000000..f3cc4fa
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+
+public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer, BitSet>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
+
+	private static final long serialVersionUID = 1748426382527469932L;
+	
+	private final int numElementsTotal;
+	
+	private BitSet duplicateChecker = new BitSet();  // this is checkpointed
+
+	private int numElements; // this is checkpointed
+
+	
+	public ValidatingExactlyOnceSink(int numElementsTotal) {
+		this.numElementsTotal = numElementsTotal;
+	}
+
+	
+	@Override
+	public void invoke(Integer value) throws Exception {
+		numElements++;
+		
+		if (duplicateChecker.get(value)) {
+			throw new Exception("Received a duplicate");
+		}
+		duplicateChecker.set(value);
+		if (numElements == numElementsTotal) {
+			// validate
+			if (duplicateChecker.cardinality() != numElementsTotal) {
+				throw new Exception("Duplicate checker has wrong cardinality");
+			}
+			else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+				throw new Exception("Received sparse sequence");
+			}
+			else {
+				throw new SuccessException();
+			}
+		}
+	}
+
+	@Override
+	public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
+		LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
+		return new Tuple2<>(numElements, duplicateChecker);
+	}
+
+	@Override
+	public void restoreState(Tuple2<Integer, BitSet> state) {
+		LOG.info("restoring num elements to {}", state.f0);
+		this.numElements = state.f0;
+		this.duplicateChecker = state.f1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-streaming-connectors/flink-connector-nifi/pom.xml
new file mode 100644
index 0000000..a590b07
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-nifi</artifactId>
+	<name>flink-connector-nifi</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<nifi.version>0.3.0</nifi.version>
+	</properties>
+
+	<dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+            <version>${nifi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<rerunFailingTestsCount>3</rerunFailingTestsCount>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<rerunFailingTestsCount>3</rerunFailingTestsCount>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
new file mode 100644
index 0000000..c8ceb57
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
+ * a FlowFile's content and its attributes so that they can be processed by Flink.
+ * </p>
+ */
+public interface NiFiDataPacket {
+
+	/**
+	 * @return the contents of a NiFi FlowFile
+	 */
+	byte[] getContent();
+
+	/**
+	 * @return a Map of attributes that are associated with the NiFi FlowFile
+	 */
+	Map<String, String> getAttributes();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..9bb521b
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * A function that can create a NiFiDataPacket from an incoming instance of the given type.
+ *
+ * @param <T>
+ */
+public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
+
+	NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
new file mode 100644
index 0000000..abc6b35
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+/**
+ * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires
+ * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data.
+ */
+public class NiFiSink<T> extends RichSinkFunction<T> {
+
+	private SiteToSiteClient client;
+	private SiteToSiteClientConfig clientConfig;
+	private NiFiDataPacketBuilder<T> builder;
+
+	/**
+	 * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 * @param builder a builder to produce NiFiDataPackets from incoming data
+	 */
+	public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder) {
+		this.clientConfig = clientConfig;
+		this.builder = builder;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		final NiFiDataPacket niFiDataPacket = builder.createNiFiDataPacket(value, getRuntimeContext());
+
+		final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+		if (transaction == null) {
+			throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
+		}
+
+		transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes());
+		transaction.confirm();
+		transaction.complete();
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		client.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
new file mode 100644
index 0000000..a213bb4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
+ * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
+ */
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
+
+	private static final long DEFAULT_WAIT_TIME_MS = 1000;
+
+	private long waitTimeMs;
+	private SiteToSiteClient client;
+	private SiteToSiteClientConfig clientConfig;
+	private transient volatile boolean running;
+
+	/**
+	 * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 */
+	public NiFiSource(SiteToSiteClientConfig clientConfig) {
+		this(clientConfig, DEFAULT_WAIT_TIME_MS);
+	}
+
+	/**
+	 * Constructs a new NiFiSource using the given client config and wait time.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
+	 */
+	public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
+		this.clientConfig = clientConfig;
+		this.waitTimeMs = waitTimeMs;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+		running = true;
+	}
+
+	@Override
+	public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
+		try {
+			while (running) {
+				final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+				if (transaction == null) {
+					LOG.warn("A transaction could not be created, waiting and will try again...");
+					try {
+						Thread.sleep(waitTimeMs);
+					} catch (InterruptedException e) {
+
+					}
+					continue;
+				}
+
+				DataPacket dataPacket = transaction.receive();
+				if (dataPacket == null) {
+					transaction.confirm();
+					transaction.complete();
+
+					LOG.debug("No data available to pull, waiting and will try again...");
+					try {
+						Thread.sleep(waitTimeMs);
+					} catch (InterruptedException e) {
+
+					}
+					continue;
+				}
+
+				final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
+				do {
+					// Read the data into a byte array and wrap it along with the attributes
+					// into a NiFiDataPacket.
+					final InputStream inStream = dataPacket.getData();
+					final byte[] data = new byte[(int) dataPacket.getSize()];
+					StreamUtils.fillBuffer(inStream, data);
+
+					final Map<String, String> attributes = dataPacket.getAttributes();
+
+					niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
+					dataPacket = transaction.receive();
+				} while (dataPacket != null);
+
+				// Confirm transaction to verify the data
+				transaction.confirm();
+
+				for (NiFiDataPacket dp : niFiDataPackets) {
+					ctx.collect(dp);
+				}
+
+				transaction.complete();
+			}
+		} finally {
+			ctx.close();
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		client.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..5ad4bae
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An implementation of NiFiDataPacket.
+ */
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
+	private static final long serialVersionUID = 6364005260220243322L;
+
+	private final byte[] content;
+	private final Map<String, String> attributes;
+
+	public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
+		this.content = content;
+		this.attributes = attributes;
+	}
+
+	@Override
+	public byte[] getContent() {
+		return content;
+	}
+
+	@Override
+	public Map<String, String> getAttributes() {
+		return attributes;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
new file mode 100644
index 0000000..572f949
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
+import org.apache.flink.streaming.connectors.nifi.NiFiSink;
+import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.util.HashMap;
+
+/**
+ * An example topology that sends data to a NiFi input port named "Data from Flink".
+ */
+public class NiFiSinkTopologyExample {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+				.url("http://localhost:8080/nifi")
+				.portName("Data from Flink")
+				.buildConfig();
+
+		DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q")
+				.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
+					@Override
+					public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
+						return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+					}
+				}));
+
+		env.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
new file mode 100644
index 0000000..79c9a1c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiSource;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.nio.charset.Charset;
+
+/**
+ * An example topology that pulls data from a NiFi output port named "Data for Flink".
+ */
+public class NiFiSourceTopologyExample {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+				.url("http://localhost:8080/nifi")
+				.portName("Data for Flink")
+				.requestBatchCount(5)
+				.buildConfig();
+
+		SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
+		DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
+
+		DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
+			@Override
+			public String map(NiFiDataPacket value) throws Exception {
+				return new String(value.getContent(), Charset.defaultCharset());
+			}
+		});
+
+		dataStream.print();
+		env.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
new file mode 100644
index 0000000..d373d63
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>769242e5-ee04-4656-a684-ca661a18eed6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>59574e3b-1ba7-4343-b265-af1b67923a85</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThresh
 old>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>48042218-a51e-45c7-bd30-2290bba8b191</id><type>OUTPUT_PORT</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>46c9343f-f732-4e2d-98e1-13caab5d2f5e</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><source><groupI
 d>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><type>INPUT_PORT</type></source><zIndex>0</zIndex></connections><inputPorts><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>395.0</x><y>520.0</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data from Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>INPUT_PORT</type></inputPorts><outputPorts><id>48042218-a51e-45c7-bd30-2290bba8b191</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1616.0</x><y>259.0</y></position><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data for Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>OUTPUT_PORT</type></outputPorts><processors><id>769242e5-ee04-4656-a684-ca661a18eed6</id><parentGroupId>0f854f2b-239f-45f0-bfed-48
 b5b23f7928</parentGroupId><position><x>389.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>File Size</key><value><description>The size of the file that will be used</description><displayName>File Size</displayName><dynamic>false</dynamic><name>File Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Batch Size</key><value><defaultValue>1</defaultValue><description>The number of FlowFiles to be transferr
 ed in each invocation</description><displayName>Batch Size</displayName><dynamic>false</dynamic><name>Batch Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Data Format</key><value><allowableValues><displayName>Binary</displayName><value>Binary</value></allowableValues><allowableValues><displayName>Text</displayName><value>Text</value></allowableValues><defaultValue>Binary</defaultValue><description>Specifies whether the data should be Text or Binary</description><displayName>Data Format</displayName><dynamic>false</dynamic><name>Data Format</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Unique FlowFiles</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, ea
 ch FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput</description><displayName>Unique FlowFiles</displayName><dynamic>false</dynamic><name>Unique FlowFiles</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>1 b</value></entry><entry><key>Batch Size</key><value>1</value></entry><entry><key>Data Format</key><value>Binary</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>2 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><description></des
 cription><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>826.0</x><y>499.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log Level</key><value><al
 lowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The Log Level to use when logging the Attributes</description><displayName>Log Level</displayName><dynamic>false</dynamic><name>Log Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, the FlowFile's p
 ayload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.</description><displayName>Log Payload</displayName><dynamic>false</dynamic><name>Log Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Log</key><value><description>A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.</description><displayName>Attributes to Log</displayName><dynamic>false</dynamic><name>Attributes to Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Ignore</key><value><description>A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.</description><displayName>Attributes to Ignore</displayName><dynamic>false</dynamic><name>Attributes to Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>
 false</supportsEl></value></entry><entry><key>Log prefix</key><value><description>Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.</description><displayName>Log prefix</displayName><dynamic>false</dynamic><name>Log prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key><value>true</value></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All FlowFil
 es are routed to this relationship</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1000.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><e
 ntry><key>Regular Expression</key><value><defaultValue>(?s:^.*$)</defaultValue><description>The Regular Expression to search for in the FlowFile content</description><displayName>Regular Expression</displayName><dynamic>false</dynamic><name>Regular Expression</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Replacement Value</key><value><defaultValue>$1</defaultValue><description>The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.</description><displayName>Replacement Value</displayName><dynamic>false</dynamic><name>Replacement Value</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Character Set</key><value><defaultValue>UTF-8</defaultValue><description>The
  Character Set in which the file is encoded</description><displayName>Character Set</displayName><dynamic>false</dynamic><name>Character Set</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Maximum Buffer Size</key><value><defaultValue>1 MB</defaultValue><description>Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'</description><displayName>Maximum Buffer 
 Size</displayName><dynamic>false</dynamic><name>Maximum Buffer Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Evaluation Mode</key><value><allowableValues><displayName>Line-by-Line</displayName><value>Line-by-Line</value></allowableValues><allowableValues><displayName>Entire text</displayName><value>Entire text</value></allowableValues><defaultValue>Entire text</defaultValue><description>Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.</description><displayName>Evaluation Mode</displayName><dynamic>false</dynamic><name>Evaluation Mode</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>(?s:^.*$)</value><
 /entry><entry><key>Replacement Value</key><value>blah blah</value></entry><entry><key>Character Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer Size</key><value>1 MB</value></entry><entry><key>Evaluation Mode</key><value>Entire text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles that could not be updated are routed to this relationship</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supports
 EventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.ReplaceText</type></processors></snippet><timestamp>09/30/2015 09:10:38 EDT</timestamp></template>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml b/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
new file mode 100644
index 0000000..314289a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-rabbitmq</artifactId>
+	<name>flink-connector-rabbitmq</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<rabbitmq.version>3.3.1</rabbitmq.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.rabbitmq</groupId>
+			<artifactId>amqp-client</artifactId>
+			<version>${rabbitmq.version}</version>
+		</dependency>
+
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
new file mode 100644
index 0000000..fa729d6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class RMQSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
+
+	private String QUEUE_NAME;
+	private String HOST_NAME;
+	private transient ConnectionFactory factory;
+	private transient Connection connection;
+	private transient Channel channel;
+	private SerializationSchema<IN, byte[]> schema;
+
+	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
+		this.HOST_NAME = HOST_NAME;
+		this.QUEUE_NAME = QUEUE_NAME;
+		this.schema = schema;
+	}
+
+	/**
+	 * Initializes the connection to RMQ.
+	 */
+	public void initializeConnection() {
+		factory = new ConnectionFactory();
+		factory.setHost(HOST_NAME);
+		try {
+			connection = factory.newConnection();
+			channel = connection.createChannel();
+			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to RMQ.
+	 * 
+	 * @param value
+	 *            The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		try {
+			byte[] msg = schema.serialize(value);
+
+			channel.basicPublish("", QUEUE_NAME, null, msg);
+
+		} catch (IOException e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+			}
+		}
+
+	}
+
+	/**
+	 * Closes the connection.
+	 */
+	private void closeChannel() {
+		try {
+			channel.close();
+			connection.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
+		}
+
+	}
+
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
+	}
+
+	@Override
+	public void close() {
+		closeChannel();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
new file mode 100644
index 0000000..50149dc
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+
+public class RMQSource<OUT> extends ConnectorSource<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private final String QUEUE_NAME;
+	private final String HOST_NAME;
+
+	private transient ConnectionFactory factory;
+	private transient Connection connection;
+	private transient Channel channel;
+	private transient QueueingConsumer consumer;
+	private transient QueueingConsumer.Delivery delivery;
+
+	private transient volatile boolean running;
+
+	public RMQSource(String HOST_NAME, String QUEUE_NAME,
+			DeserializationSchema<OUT> deserializationSchema) {
+		super(deserializationSchema);
+		this.HOST_NAME = HOST_NAME;
+		this.QUEUE_NAME = QUEUE_NAME;
+	}
+
+	/**
+	 * Initializes the connection to RMQ.
+	 */
+	private void initializeConnection() {
+		factory = new ConnectionFactory();
+		factory.setHost(HOST_NAME);
+		try {
+			connection = factory.newConnection();
+			channel = connection.createChannel();
+			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+			consumer = new QueueingConsumer(channel);
+			channel.basicConsume(QUEUE_NAME, true, consumer);
+		} catch (IOException e) {
+			throw new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
+					+ HOST_NAME, e);
+		}
+	}
+
+	@Override
+	public void open(Configuration config) throws Exception {
+		initializeConnection();
+		running = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		try {
+			connection.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
+		}
+	}
+
+	@Override
+	public void run(SourceContext<OUT> ctx) throws Exception {
+		while (running) {
+			delivery = consumer.nextDelivery();
+
+			OUT result = schema.deserialize(delivery.getBody());
+			if (schema.isEndOfStream(result)) {
+				break;
+			}
+
+			ctx.collect(result);
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
new file mode 100644
index 0000000..1f85862
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+public class RMQTopology {
+
+	public static void main(String[] args) throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		@SuppressWarnings("unused")
+		DataStreamSink<String> dataStream1 = env.addSource(
+				new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
+
+		@SuppressWarnings("unused")
+		DataStreamSink<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
+				"q").addSink(
+				new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
+
+		env.execute();
+	}
+
+	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public byte[] serialize(String element) {
+			return element.getBytes();
+		}
+	}
+}


Mime
View raw message