flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [25/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.
Date Fri, 02 Dec 2016 13:35:15 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
new file mode 100644
index 0000000..65e6d4e
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyInterface} interface.
+ */
+public class FakeKinesisBehavioursFactory {
+
+	// ------------------------------------------------------------------------
+	//  Behaviours related to shard listing and resharding, used in KinesisDataFetcherTest
+	// ------------------------------------------------------------------------
+
+	public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
+
+		return new KinesisProxyInterface() {
+			@Override
+			public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
+				return new GetShardListResult(); // not setting any retrieved shards for result
+			}
+
+			@Override
+			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+				return null;
+			}
+
+			@Override
+			public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+				return null;
+			}
+		};
+
+	}
+
+	public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String,Integer> streamsToShardCount) {
+		return new NonReshardedStreamsKinesis(streamsToShardCount);
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  Behaviours related to fetching records, used mainly in ShardConsumerTest
+	// ------------------------------------------------------------------------
+
+	public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(final int numOfRecords, final int numOfGetRecordsCalls) {
+		return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls);
+	}
+	
+	public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
+		final int numOfRecords, final int numOfGetRecordsCall, final int orderOfCallToExpire) {
+		return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
+			numOfRecords, numOfGetRecordsCall, orderOfCallToExpire);
+	}
+
+	public static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
+
+		private boolean expiredOnceAlready = false;
+		private boolean expiredIteratorRefreshed = false;
+		private final int orderOfCallToExpire;
+
+		public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(final int numOfRecords,
+																			final int numOfGetRecordsCalls,
+																			final int orderOfCallToExpire) {
+			super(numOfRecords, numOfGetRecordsCalls);
+			checkArgument(orderOfCallToExpire <= numOfGetRecordsCalls,
+				"can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls");
+			this.orderOfCallToExpire = orderOfCallToExpire;
+		}
+
+		@Override
+		public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+			if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) {
+				// we fake only once the expired iterator exception at the specified get records attempt order
+				expiredOnceAlready = true;
+				throw new ExpiredIteratorException("Artificial expired shard iterator");
+			} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
+				// if we've thrown the expired iterator exception already, but the iterator was not refreshed,
+				// throw a hard exception to the test that is testing this Kinesis behaviour
+				throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
+			} else {
+				// assuming that the maxRecordsToGet is always large enough
+				return new GetRecordsResult()
+					.withRecords(shardItrToRecordBatch.get(shardIterator))
+					.withNextShardIterator(
+						(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
+							? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
+			}
+		}
+
+		@Override
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+			if (!expiredOnceAlready) {
+				// for the first call, just return the iterator of the first batch of records
+				return "0";
+			} else {
+				// fake the iterator refresh when this is called again after getRecords throws expired iterator
+				// exception on the orderOfCallToExpire attempt
+				expiredIteratorRefreshed = true;
+				return String.valueOf(orderOfCallToExpire-1);
+			}
+		}
+	}
+
+	private static class SingleShardEmittingFixNumOfRecordsKinesis implements KinesisProxyInterface {
+
+		protected final int totalNumOfGetRecordsCalls;
+
+		protected final int totalNumOfRecords;
+
+		protected final Map<String,List<Record>> shardItrToRecordBatch;
+
+		public SingleShardEmittingFixNumOfRecordsKinesis(final int numOfRecords, final int numOfGetRecordsCalls) {
+			this.totalNumOfRecords = numOfRecords;
+			this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls;
+
+			// initialize the record batches that we will be fetched
+			this.shardItrToRecordBatch = new HashMap<>();
+
+			int numOfAlreadyPartitionedRecords = 0;
+			int numOfRecordsPerBatch = numOfRecords/numOfGetRecordsCalls + 1;
+			for (int batch=0; batch<totalNumOfGetRecordsCalls; batch++) {
+				if (batch != totalNumOfGetRecordsCalls-1) {
+					shardItrToRecordBatch.put(
+						String.valueOf(batch),
+						createRecordBatchWithRange(
+							numOfAlreadyPartitionedRecords,
+							numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
+					numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
+				} else {
+					shardItrToRecordBatch.put(
+						String.valueOf(batch),
+						createRecordBatchWithRange(
+							numOfAlreadyPartitionedRecords,
+							totalNumOfRecords));
+				}
+			}
+		}
+
+		@Override
+		public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+			// assuming that the maxRecordsToGet is always large enough
+			return new GetRecordsResult()
+				.withRecords(shardItrToRecordBatch.get(shardIterator))
+				.withNextShardIterator(
+					(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls-1)
+						? null : String.valueOf(Integer.valueOf(shardIterator)+1)); // last next shard iterator is null
+		}
+
+		@Override
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+			// this will be called only one time per ShardConsumer;
+			// so, simply return the iterator of the first batch of records
+			return "0";
+		}
+
+		@Override
+		public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
+			return null;
+		}
+
+		public static List<Record> createRecordBatchWithRange(int min, int max) {
+			List<Record> batch = new LinkedList<>();
+			for (int i = min; i < max; i++) {
+				batch.add(
+					new Record()
+						.withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
+						.withPartitionKey(UUID.randomUUID().toString())
+						.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
+						.withSequenceNumber(String.valueOf(i)));
+			}
+			return batch;
+		}
+
+	}
+
+	private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
+
+		private Map<String, List<KinesisStreamShard>> streamsWithListOfShards = new HashMap<>();
+
+		public NonReshardedStreamsKinesis(Map<String,Integer> streamsToShardCount) {
+			for (Map.Entry<String,Integer> streamToShardCount : streamsToShardCount.entrySet()) {
+				String streamName = streamToShardCount.getKey();
+				int shardCount = streamToShardCount.getValue();
+
+				if (shardCount == 0) {
+					// don't do anything
+				} else {
+					List<KinesisStreamShard> shardsOfStream = new ArrayList<>(shardCount);
+					for (int i=0; i < shardCount; i++) {
+						shardsOfStream.add(
+							new KinesisStreamShard(
+								streamName,
+								new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
+					}
+					streamsWithListOfShards.put(streamName, shardsOfStream);
+				}
+			}
+		}
+
+		@Override
+		public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
+			GetShardListResult result = new GetShardListResult();
+			for (Map.Entry<String, List<KinesisStreamShard>> streamsWithShards : streamsWithListOfShards.entrySet()) {
+				String streamName = streamsWithShards.getKey();
+				for (KinesisStreamShard shard : streamsWithShards.getValue()) {
+					if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
+						result.addRetrievedShardToStream(streamName, shard);
+					} else {
+						if (KinesisStreamShard.compareShardIds(
+							shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) > 0) {
+							result.addRetrievedShardToStream(streamName, shard);
+						}
+					}
+				}
+			}
+			return result;
+		}
+
+		@Override
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+			return null;
+		}
+
+		@Override
+		public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
new file mode 100644
index 0000000..fdfdfe1
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A thread that runs a topology with a manual data generator as source, and the FlinkKinesisProducer as sink.
+ */
+public class KinesisEventsGeneratorProducerThread {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisEventsGeneratorProducerThread.class);
+
+	public static Thread create(final int totalEventCount,
+								final int parallelism,
+								final String awsAccessKey,
+								final String awsSecretKey,
+								final String awsRegion,
+								final String kinesisStreamName,
+								final AtomicReference<Throwable> errorHandler,
+								final int flinkPort,
+								final Configuration flinkConfig) {
+		Runnable kinesisEventsGeneratorProducer = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, flinkConfig);
+					see.setParallelism(parallelism);
+
+					// start data generator
+					DataStream<String> simpleStringStream = see.addSource(new KinesisEventsGeneratorProducerThread.EventsGenerator(totalEventCount)).setParallelism(1);
+
+					Properties producerProps = new Properties();
+					producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey);
+					producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey);
+					producerProps.setProperty(AWSConfigConstants.AWS_REGION, awsRegion);
+
+					FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(),
+						producerProps);
+
+					kinesis.setFailOnError(true);
+					kinesis.setDefaultStream(kinesisStreamName);
+					kinesis.setDefaultPartition("0");
+					simpleStringStream.addSink(kinesis);
+
+					LOG.info("Starting producing topology");
+					see.execute("Producing topology");
+					LOG.info("Producing topo finished");
+				} catch (Exception e) {
+					LOG.warn("Error while running producing topology", e);
+					errorHandler.set(e);
+				}
+			}
+		};
+
+		return new Thread(kinesisEventsGeneratorProducer);
+	}
+
+	private static class EventsGenerator implements SourceFunction<String> {
+
+		private static final Logger LOG = LoggerFactory.getLogger(EventsGenerator.class);
+
+		private boolean running = true;
+		private final long limit;
+
+		public EventsGenerator(long limit) {
+			this.limit = limit;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			long seq = 0;
+			while(running) {
+				Thread.sleep(10);
+				String evt = (seq++) + "-" + RandomStringUtils.randomAlphabetic(12);
+				ctx.collect(evt);
+				LOG.info("Emitting event {}", evt);
+				if(seq >= limit) {
+					break;
+				}
+			}
+			ctx.close();
+			LOG.info("Stopping events generator");
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
new file mode 100644
index 0000000..c8dd347
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+public class KinesisShardIdGenerator {
+	// Kinesis shards ids are in the form of: ^shardId-\d{12}
+	public static String generateFromShardOrder(int order) {
+		return String.format("shardId-%012d", order);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
new file mode 100644
index 0000000..80ad06c
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Properties;
+
+public class TestableFlinkKinesisConsumer extends FlinkKinesisConsumer<String> {
+
+	private final RuntimeContext mockedRuntimeCtx;
+
+	public TestableFlinkKinesisConsumer(String fakeStream,
+										Properties fakeConfiguration,
+										final int totalNumOfConsumerSubtasks,
+										final int indexOfThisConsumerSubtask) {
+		super(fakeStream, new SimpleStringSchema(), fakeConfiguration);
+
+		this.mockedRuntimeCtx = Mockito.mock(RuntimeContext.class);
+
+		Mockito.when(mockedRuntimeCtx.getNumberOfParallelSubtasks()).thenAnswer(new Answer<Integer>() {
+			@Override
+			public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return totalNumOfConsumerSubtasks;
+			}
+		});
+
+		Mockito.when(mockedRuntimeCtx.getIndexOfThisSubtask()).thenAnswer(new Answer<Integer>() {
+			@Override
+			public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return indexOfThisConsumerSubtask;
+			}
+		});
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		return this.mockedRuntimeCtx;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
new file mode 100644
index 0000000..57886fe
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestableKinesisDataFetcher extends KinesisDataFetcher<String> {
+
+	private static final Object fakeCheckpointLock = new Object();
+
+	private long numElementsCollected;
+
+	public TestableKinesisDataFetcher(List<String> fakeStreams,
+									  Properties fakeConfiguration,
+									  int fakeTotalCountOfSubtasks,
+									  int fakeTndexOfThisSubtask,
+									  AtomicReference<Throwable> thrownErrorUnderTest,
+									  LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
+									  HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+									  KinesisProxyInterface fakeKinesis) {
+		super(fakeStreams,
+			getMockedSourceContext(),
+			fakeCheckpointLock,
+			getMockedRuntimeContext(fakeTotalCountOfSubtasks, fakeTndexOfThisSubtask),
+			fakeConfiguration,
+			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
+			thrownErrorUnderTest,
+			subscribedShardsStateUnderTest,
+			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+			fakeKinesis);
+
+		this.numElementsCollected = 0;
+	}
+
+	public long getNumOfElementsCollected() {
+		return numElementsCollected;
+	}
+
+	@Override
+	protected KinesisDeserializationSchema<String> getClonedDeserializationSchema() {
+		return new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema());
+	}
+
+	@Override
+	protected void emitRecordAndUpdateState(String record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+		synchronized (fakeCheckpointLock) {
+			this.numElementsCollected++;
+			updateState(shardStateIndex, lastSequenceNumber);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static SourceFunction.SourceContext<String> getMockedSourceContext() {
+		return Mockito.mock(SourceFunction.SourceContext.class);
+	}
+
+	private static RuntimeContext getMockedRuntimeContext(final int fakeTotalCountOfSubtasks, final int fakeTndexOfThisSubtask) {
+		RuntimeContext mockedRuntimeContext = Mockito.mock(RuntimeContext.class);
+
+		Mockito.when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenAnswer(new Answer<Integer>() {
+			@Override
+			public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return fakeTotalCountOfSubtasks;
+			}
+		});
+
+		Mockito.when(mockedRuntimeContext.getIndexOfThisSubtask()).thenAnswer(new Answer<Integer>() {
+			@Override
+			public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return fakeTndexOfThisSubtask;
+			}
+		});
+
+		Mockito.when(mockedRuntimeContext.getTaskName()).thenAnswer(new Answer<String>() {
+			@Override
+			public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return "Fake Task";
+			}
+		});
+
+		Mockito.when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenAnswer(new Answer<String>() {
+			@Override
+			public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return "Fake Task (" + fakeTndexOfThisSubtask + "/" + fakeTotalCountOfSubtasks + ")";
+			}
+		});
+
+		return mockedRuntimeContext;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/pom.xml b/flink-connectors/flink-connector-nifi/pom.xml
new file mode 100644
index 0000000..e04a63a
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-nifi_2.10</artifactId>
+	<name>flink-connector-nifi</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<nifi.version>0.6.1</nifi.version>
+	</properties>
+
+	<dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+            <version>${nifi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+			<scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.10</artifactId>
+            <version>${project.version}</version>
+			<type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<rerunFailingTestsCount>3</rerunFailingTestsCount>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
new file mode 100644
index 0000000..c8ceb57
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
+ * a FlowFile's content and its attributes so that they can be processed by Flink.
+ * </p>
+ */
+public interface NiFiDataPacket {
+
+	/**
+	 * @return the contents of a NiFi FlowFile
+	 */
+	byte[] getContent();
+
+	/**
+	 * @return a Map of attributes that are associated with the NiFi FlowFile
+	 */
+	Map<String, String> getAttributes();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..9bb521b
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * A function that can create a NiFiDataPacket from an incoming instance of the given type.
+ *
+ * @param <T>
+ */
+public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
+
+	NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
new file mode 100644
index 0000000..abc6b35
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+/**
+ * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires
+ * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data.
+ */
+public class NiFiSink<T> extends RichSinkFunction<T> {
+
+	private SiteToSiteClient client;
+	private SiteToSiteClientConfig clientConfig;
+	private NiFiDataPacketBuilder<T> builder;
+
+	/**
+	 * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 * @param builder a builder to produce NiFiDataPackets from incoming data
+	 */
+	public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder) {
+		this.clientConfig = clientConfig;
+		this.builder = builder;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		final NiFiDataPacket niFiDataPacket = builder.createNiFiDataPacket(value, getRuntimeContext());
+
+		final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+		if (transaction == null) {
+			throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
+		}
+
+		transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes());
+		transaction.confirm();
+		transaction.complete();
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		client.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
new file mode 100644
index 0000000..57c59ec
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
+ * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
+ */
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction{
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
+
+	private static final long DEFAULT_WAIT_TIME_MS = 1000;
+
+	// ------------------------------------------------------------------------
+
+	private final SiteToSiteClientConfig clientConfig;
+
+	private final long waitTimeMs;
+
+	private transient SiteToSiteClient client;
+
+	private volatile boolean isRunning = true;
+
+	/**
+	 * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 */
+	public NiFiSource(SiteToSiteClientConfig clientConfig) {
+		this(clientConfig, DEFAULT_WAIT_TIME_MS);
+	}
+
+	/**
+	 * Constructs a new NiFiSource using the given client config and wait time.
+	 *
+	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
+	 * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
+	 */
+	public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
+		this.clientConfig = clientConfig;
+		this.waitTimeMs = waitTimeMs;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+	}
+
+	@Override
+	public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
+		while (isRunning) {
+			final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+			if (transaction == null) {
+				LOG.warn("A transaction could not be created, waiting and will try again...");
+				try {
+					Thread.sleep(waitTimeMs);
+				} catch (InterruptedException ignored) {
+
+				}
+				continue;
+			}
+
+			DataPacket dataPacket = transaction.receive();
+			if (dataPacket == null) {
+				transaction.confirm();
+				transaction.complete();
+
+				LOG.debug("No data available to pull, waiting and will try again...");
+				try {
+					Thread.sleep(waitTimeMs);
+				} catch (InterruptedException ignored) {
+
+				}
+				continue;
+			}
+
+			final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
+			do {
+				// Read the data into a byte array and wrap it along with the attributes
+				// into a NiFiDataPacket.
+				final InputStream inStream = dataPacket.getData();
+				final byte[] data = new byte[(int) dataPacket.getSize()];
+				StreamUtils.fillBuffer(inStream, data);
+
+				final Map<String, String> attributes = dataPacket.getAttributes();
+
+				niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
+				dataPacket = transaction.receive();
+			} while (dataPacket != null);
+
+			// Confirm transaction to verify the data
+			transaction.confirm();
+
+			for (NiFiDataPacket dp : niFiDataPackets) {
+				ctx.collect(dp);
+			}
+
+			transaction.complete();
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		client.close();
+	}
+
+	@Override
+	public void stop() {
+		this.isRunning = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..5ad4bae
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An implementation of NiFiDataPacket.
+ */
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
+	private static final long serialVersionUID = 6364005260220243322L;
+
+	private final byte[] content;
+	private final Map<String, String> attributes;
+
+	public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
+		this.content = content;
+		this.attributes = attributes;
+	}
+
+	@Override
+	public byte[] getContent() {
+		return content;
+	}
+
+	@Override
+	public Map<String, String> getAttributes() {
+		return attributes;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
new file mode 100644
index 0000000..572f949
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
+import org.apache.flink.streaming.connectors.nifi.NiFiSink;
+import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.util.HashMap;
+
+/**
+ * An example topology that sends data to a NiFi input port named "Data from Flink".
+ */
+public class NiFiSinkTopologyExample {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+				.url("http://localhost:8080/nifi")
+				.portName("Data from Flink")
+				.buildConfig();
+
+		DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q")
+				.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
+					@Override
+					public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
+						return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+					}
+				}));
+
+		env.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
new file mode 100644
index 0000000..79c9a1c
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiSource;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.nio.charset.Charset;
+
+/**
+ * An example topology that pulls data from a NiFi output port named "Data for Flink".
+ */
+public class NiFiSourceTopologyExample {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+				.url("http://localhost:8080/nifi")
+				.portName("Data for Flink")
+				.requestBatchCount(5)
+				.buildConfig();
+
+		SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
+		DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
+
+		DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
+			@Override
+			public String map(NiFiDataPacket value) throws Exception {
+				return new String(value.getContent(), Charset.defaultCharset());
+			}
+		});
+
+		dataStream.print();
+		env.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
new file mode 100644
index 0000000..d373d63
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>769242e5-ee04-4656-a684-ca661a18eed6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>59574e3b-1ba7-4343-b265-af1b67923a85</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThresh
 old>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>48042218-a51e-45c7-bd30-2290bba8b191</id><type>OUTPUT_PORT</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>46c9343f-f732-4e2d-98e1-13caab5d2f5e</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><source><groupI
 d>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><type>INPUT_PORT</type></source><zIndex>0</zIndex></connections><inputPorts><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>395.0</x><y>520.0</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data from Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>INPUT_PORT</type></inputPorts><outputPorts><id>48042218-a51e-45c7-bd30-2290bba8b191</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1616.0</x><y>259.0</y></position><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data for Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>OUTPUT_PORT</type></outputPorts><processors><id>769242e5-ee04-4656-a684-ca661a18eed6</id><parentGroupId>0f854f2b-239f-45f0-bfed-48
 b5b23f7928</parentGroupId><position><x>389.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>File Size</key><value><description>The size of the file that will be used</description><displayName>File Size</displayName><dynamic>false</dynamic><name>File Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Batch Size</key><value><defaultValue>1</defaultValue><description>The number of FlowFiles to be transferr
 ed in each invocation</description><displayName>Batch Size</displayName><dynamic>false</dynamic><name>Batch Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Data Format</key><value><allowableValues><displayName>Binary</displayName><value>Binary</value></allowableValues><allowableValues><displayName>Text</displayName><value>Text</value></allowableValues><defaultValue>Binary</defaultValue><description>Specifies whether the data should be Text or Binary</description><displayName>Data Format</displayName><dynamic>false</dynamic><name>Data Format</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Unique FlowFiles</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, ea
 ch FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput</description><displayName>Unique FlowFiles</displayName><dynamic>false</dynamic><name>Unique FlowFiles</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>1 b</value></entry><entry><key>Batch Size</key><value>1</value></entry><entry><key>Data Format</key><value>Binary</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>2 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><description></des
 cription><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>826.0</x><y>499.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log Level</key><value><al
 lowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The Log Level to use when logging the Attributes</description><displayName>Log Level</displayName><dynamic>false</dynamic><name>Log Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, the FlowFile's p
 ayload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.</description><displayName>Log Payload</displayName><dynamic>false</dynamic><name>Log Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Log</key><value><description>A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.</description><displayName>Attributes to Log</displayName><dynamic>false</dynamic><name>Attributes to Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Ignore</key><value><description>A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.</description><displayName>Attributes to Ignore</displayName><dynamic>false</dynamic><name>Attributes to Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>
 false</supportsEl></value></entry><entry><key>Log prefix</key><value><description>Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.</description><displayName>Log prefix</displayName><dynamic>false</dynamic><name>Log prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key><value>true</value></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All FlowFil
 es are routed to this relationship</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1000.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><e
 ntry><key>Regular Expression</key><value><defaultValue>(?s:^.*$)</defaultValue><description>The Regular Expression to search for in the FlowFile content</description><displayName>Regular Expression</displayName><dynamic>false</dynamic><name>Regular Expression</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Replacement Value</key><value><defaultValue>$1</defaultValue><description>The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.</description><displayName>Replacement Value</displayName><dynamic>false</dynamic><name>Replacement Value</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Character Set</key><value><defaultValue>UTF-8</defaultValue><description>The
  Character Set in which the file is encoded</description><displayName>Character Set</displayName><dynamic>false</dynamic><name>Character Set</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Maximum Buffer Size</key><value><defaultValue>1 MB</defaultValue><description>Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'</description><displayName>Maximum Buffer 
 Size</displayName><dynamic>false</dynamic><name>Maximum Buffer Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Evaluation Mode</key><value><allowableValues><displayName>Line-by-Line</displayName><value>Line-by-Line</value></allowableValues><allowableValues><displayName>Entire text</displayName><value>Entire text</value></allowableValues><defaultValue>Entire text</defaultValue><description>Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.</description><displayName>Evaluation Mode</displayName><dynamic>false</dynamic><name>Evaluation Mode</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>(?s:^.*$)</value><
 /entry><entry><key>Replacement Value</key><value>blah blah</value></entry><entry><key>Character Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer Size</key><value>1 MB</value></entry><entry><key>Evaluation Mode</key><value>Entire text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles that could not be updated are routed to this relationship</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supports
 EventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.ReplaceText</type></processors></snippet><timestamp>09/30/2015 09:10:38 EDT</timestamp></template>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
new file mode 100644
index 0000000..0b69d66
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-rabbitmq_2.10</artifactId>
+	<name>flink-connector-rabbitmq</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<rabbitmq.version>3.3.1</rabbitmq.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.rabbitmq</groupId>
+			<artifactId>amqp-client</artifactId>
+			<version>${rabbitmq.version}</version>
+		</dependency>
+
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
new file mode 100644
index 0000000..a0795d6
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+/**
+ * A Sink for publishing data into RabbitMQ
+ * @param <IN>
+ */
+public class RMQSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
+
+	protected final String queueName;
+	private final RMQConnectionConfig rmqConnectionConfig;
+	protected transient Connection connection;
+	protected transient Channel channel;
+	protected SerializationSchema<IN> schema;
+	private boolean logFailuresOnly = false;
+
+	/**
+	 * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param queueName The queue to publish messages to.
+	 * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
+     */
+	public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) {
+		this.rmqConnectionConfig = rmqConnectionConfig;
+		this.queueName = queueName;
+		this.schema = schema;
+	}
+
+	/**
+	 * Sets up the queue. The default implementation just declares the queue. The user may override
+	 * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
+	 * defining custom queue parameters)
+	 */
+	protected void setupQueue() throws IOException {
+		channel.queueDeclare(queueName, false, false, false, null);
+	}
+
+	/**
+	 * Defines whether the producer should fail on errors, or only log them.
+	 * If this is set to true, then exceptions will be only logged, if set to false,
+	 * exceptions will be eventually thrown and cause the streaming program to
+	 * fail (and enter recovery).
+	 *
+	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+	 */
+	public void setLogFailuresOnly(boolean logFailuresOnly) {
+		this.logFailuresOnly = logFailuresOnly;
+	}
+
+
+	@Override
+	public void open(Configuration config) throws Exception {
+		ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
+		try {
+			connection = factory.newConnection();
+			channel = connection.createChannel();
+			if (channel == null) {
+				throw new RuntimeException("None of RabbitMQ channels are available");
+			}
+			setupQueue();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while creating the channel", e);
+		}
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to RMQ.
+	 *
+	 * @param value
+	 *            The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		try {
+			byte[] msg = schema.serialize(value);
+
+			channel.basicPublish("", queueName, null, msg);
+		} catch (IOException e) {
+			if (logFailuresOnly) {
+				LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);
+			} else {
+				throw new RuntimeException("Cannot send RMQ message " + queueName +" at " + rmqConnectionConfig.getHost(), e);
+			}
+		}
+
+	}
+
+	@Override
+	public void close() {
+		IOException t = null;
+		try {
+			channel.close();
+		} catch (IOException e) {
+			t = e;
+		}
+
+		try {
+			connection.close();
+		} catch (IOException e) {
+			if(t != null) {
+				LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);
+			}
+			t = e;
+		}
+		if(t != null) {
+			throw new RuntimeException("Error while closing RMQ connection with " + queueName
+					+ " at " + rmqConnectionConfig.getHost(), t);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
new file mode 100644
index 0000000..ee9c3b9
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
+ * When checkpointing is enabled, it guarantees exactly-once processing semantics.
+ *
+ * RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will re-resend all messages
+ * which have not been acknowledged previously. When a failure occurs directly after a completed
+ * checkpoint, all messages part of this checkpoint might be processed again because they couldn't
+ * be acknowledged before failure. This case is handled by the {@link MessageAcknowledgingSourceBase}
+ * base class which deduplicates the messages using the correlation id.
+ *
+ * RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why the source uses the
+ * Correlation ID in the message properties to check for duplicate messages. Note that the
+ * correlation id has to be set at the producer. If the correlation id is not set, messages may
+ * be produced more than once in corner cases.
+ *
+ * This source can be operated in three different modes:
+ *
+ * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
+ *    unique correlation IDs.
+ * 2) At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
+ *    (correlation id is not set).
+ * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
+ *
+ * Users may overwrite the setupConnectionFactory() method to pass their setup their own
+ * ConnectionFactory in case the constructor parameters are not sufficient.
+ *
+ * @param <OUT> The type of the data read from RabbitMQ.
+ */
+public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
+	implements ResultTypeQueryable<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
+
+	private final RMQConnectionConfig rmqConnectionConfig;
+	protected final String queueName;
+	private final boolean usesCorrelationId;
+	protected DeserializationSchema<OUT> schema;
+
+	protected transient Connection connection;
+	protected transient Channel channel;
+	protected transient QueueingConsumer consumer;
+
+	protected transient boolean autoAck;
+
+	private transient volatile boolean running;
+
+	/**
+	 * Creates a new RabbitMQ source with at-least-once message processing guarantee when
+	 * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
+	 * For exactly-once, please use the constructor
+	 * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean usesCorrelationId, DeserializationSchema)},
+	 * set {@param usesCorrelationId} to true and enable checkpointing.
+	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param queueName  The queue to receive messages from.
+	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
+	 *               				into Java objects.
+	 */
+	public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName,
+					DeserializationSchema<OUT> deserializationSchema) {
+		this(rmqConnectionConfig, queueName, false, deserializationSchema);
+	}
+
+	/**
+	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
+	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
+	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
+	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
+	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param queueName The queue to receive messages from.
+	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
+	 *                          id to deduplicate messages (in case of failed acknowledgments).
+	 *                          Only used when checkpointing is enabled.
+	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
+	 *                              into Java objects.
+	 */
+	public RMQSource(RMQConnectionConfig rmqConnectionConfig,
+					String queueName, boolean usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) {
+		super(String.class);
+		this.rmqConnectionConfig = rmqConnectionConfig;
+		this.queueName = queueName;
+		this.usesCorrelationId = usesCorrelationId;
+		this.schema = deserializationSchema;
+	}
+
+	/**
+	 * Initializes the connection to RMQ with a default connection factory. The user may override
+	 * this method to setup and configure their own ConnectionFactory.
+	 */
+	protected ConnectionFactory setupConnectionFactory() throws Exception {
+		return rmqConnectionConfig.getConnectionFactory();
+	}
+
+	/**
+	 * Sets up the queue. The default implementation just declares the queue. The user may override
+	 * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
+	 * defining custom queue parameters)
+	 */
+	protected void setupQueue() throws IOException {
+		channel.queueDeclare(queueName, true, false, false, null);
+	}
+
+	@Override
+	public void open(Configuration config) throws Exception {
+		super.open(config);
+		ConnectionFactory factory = setupConnectionFactory();
+		try {
+			connection = factory.newConnection();
+			channel = connection.createChannel();
+			if (channel == null) {
+				throw new RuntimeException("None of RabbitMQ channels are available");
+			}
+			setupQueue();
+			consumer = new QueueingConsumer(channel);
+
+			RuntimeContext runtimeContext = getRuntimeContext();
+			if (runtimeContext instanceof StreamingRuntimeContext
+					&& ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
+				autoAck = false;
+				// enables transaction mode
+				channel.txSelect();
+			} else {
+				autoAck = true;
+			}
+
+			LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck);
+			channel.basicConsume(queueName, autoAck, consumer);
+
+		} catch (IOException e) {
+			throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
+					+ rmqConnectionConfig.getHost(), e);
+		}
+		running = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		try {
+			connection.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing RMQ connection with " + queueName
+				+ " at " + rmqConnectionConfig.getHost(), e);
+		}
+	}
+
+
+	@Override
+	public void run(SourceContext<OUT> ctx) throws Exception {
+		while (running) {
+			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+
+			synchronized (ctx.getCheckpointLock()) {
+
+				OUT result = schema.deserialize(delivery.getBody());
+
+				if (schema.isEndOfStream(result)) {
+					break;
+				}
+
+				if (!autoAck) {
+					final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+					if (usesCorrelationId) {
+						final String correlationId = delivery.getProperties().getCorrelationId();
+						Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
+							"with usesCorrelationId set to true but a message was received with " +
+							"correlation id set to null!");
+						if (!addId(correlationId)) {
+							// we have already processed this message
+							continue;
+						}
+					}
+					sessionIds.add(deliveryTag);
+				}
+
+				ctx.collect(result);
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+
+	@Override
+	protected void acknowledgeSessionIDs(List<Long> sessionIds) {
+		try {
+			for (long id : sessionIds) {
+				channel.basicAck(id, false);
+			}
+			channel.txCommit();
+		} catch (IOException e) {
+			throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
+		}
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return schema.getProducedType();
+	}
+}


Mime
View raw message