flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [FLINK-2624] improvements to the RabbitMQ source
Date Tue, 01 Dec 2015 14:45:54 GMT
Repository: flink
Updated Branches:
  refs/heads/master 31f6a744c -> 9215b7242


[FLINK-2624] improvements to the RabbitMQ source

The RabbitMQ source may operate in three different modes:

1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
   unique correlation IDs.

2) At-least-once (when checkpointed) with RabbitMQ transactions but no
   deduplication mechanism (correlation id is not set).

3) No strong delivery guarantees (without checkpointing) with RabbitMQ
   auto-commit mode.

- add base class which can handle both session ids and unique ids
  - session ids are used for acknowledgment
  - unique ids are used for de-duplication

- add unit test

- add documentation

This closes #1243.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9215b724
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9215b724
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9215b724

Branch: refs/heads/master
Commit: 9215b72422d3e638fe950b61fa01f2e4e04981a0
Parents: b630580
Author: Maximilian Michels <mxm@apache.org>
Authored: Mon Nov 30 15:04:31 2015 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Dec 1 15:44:47 2015 +0100

----------------------------------------------------------------------
 docs/apis/cluster_execution.md                  |   4 +-
 docs/apis/streaming_guide.md                    |  35 +-
 .../flink-connector-rabbitmq/pom.xml            |   6 +
 .../connectors/rabbitmq/RMQSource.java          | 163 ++++++++--
 .../connectors/rabbitmq/RMQSourceTest.java      | 326 +++++++++++++++++++
 .../source/MessageAcknowledgingSourceBase.java  | 207 ++++++++++++
 .../source/MessageAcknowledingSourceBase.java   | 188 -----------
 ...ltipleIdsMessageAcknowledgingSourceBase.java | 139 ++++++++
 8 files changed, 838 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/docs/apis/cluster_execution.md
----------------------------------------------------------------------
diff --git a/docs/apis/cluster_execution.md b/docs/apis/cluster_execution.md
index 80c15a0..54e1b41 100644
--- a/docs/apis/cluster_execution.md
+++ b/docs/apis/cluster_execution.md
@@ -89,8 +89,8 @@ added modules. To run code depending on these modules you need to make them acce
 during runtime, for which we suggest two options:
 
 1. Either copy the required jar files to the `lib` folder onto all of your TaskManagers.
-Note that you have to restar your TaskManagers after this.
-2. Or package them with your usercode.
+Note that you have to restart your TaskManagers after this.
+2. Or package them with your code.
 
 The latter version is recommended as it respects the classloader management in Flink.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 366de22..6197493 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -3687,29 +3687,52 @@ Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.co
 
 #### RabbitMQ Source
 
-A class providing an interface for receiving data from RabbitMQ.
+A class which provides an interface for receiving data from RabbitMQ.
 
 The followings have to be provided for the `RMQSource(…)` constructor in order:
 
-1. The hostname
-2. The queue name
-3. Deserialization schema
+- hostName: The RabbitMQ broker hostname.
+- queueName: The RabbitMQ queue name.
+- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`).
+- deserializationScehma: Deserialization schema to turn messages into Java objects.
+
+This source can be operated in three different modes:
+
+1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
+    unique correlation IDs.
+2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
+    (correlation id is not set).
+3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
+
+Correlation ids are a RabbitMQ application feature. You have to set it in the message properties
+when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not supply
+unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore
+messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you don't
+have to supply correlation ids.
 
 Example:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<String> stream = env
+DataStream<String> streamWithoutCorrelationIds = env
 	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
 	.print
+
+DataStream<String> streamWithCorrelationIds = env
+	.addSource(new RMQSource<String>("localhost", "hello", true, new SimpleStringSchema()))
+	.print
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-stream = env
+streamWithoutCorrelationIds = env
     .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema))
     .print
+
+streamWithCorrelationIds = env
+    .addSource(new RMQSource[String]("localhost", "hello", true, new SimpleStringSchema))
+    .print
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml b/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
index f2fdd4e..2e98e3b 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
@@ -54,6 +54,12 @@ under the License.
 			<version>${rabbitmq.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index e8e867e..09bb07c 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -20,59 +20,135 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.MessageAcknowledingSourceBase;
+import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
-import com.esotericsoftware.minlog.Log;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
+ * When checkpointing is enabled, it guarantees exactly-once processing semantics.
+ *
+ * RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will re-resend all messages
+ * which have not been acknowledged previously. When a failure occurs directly after a completed
+ * checkpoint, all messages part of this checkpoint might be processed again because they couldn't
+ * be acknowledged before failure. This case is handled by the {@link MessageAcknowledgingSourceBase}
+ * base class which deduplicates the messages using the correlation id.
+ *
+ * RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why the source uses the
+ * Correlation ID in the message properties to check for duplicate messages. Note that the
+ * correlation id has to be set at the producer. If the correlation id is not set, messages may
+ * be produced more than once in corner cases.
+ *
+ * This source can be operated in three different modes:
+ *
+ * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
+ *    unique correlation IDs.
+ * 2) At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
+ *    (correlation id is not set).
+ * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
+ *
+ * @param <OUT> The type of the data read from RabbitMQ.
+ */
+public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
+		implements ResultTypeQueryable<OUT> {
 
-public class RMQSource<OUT> extends MessageAcknowledingSourceBase<OUT, Long> implements ResultTypeQueryable<OUT>{
 	private static final long serialVersionUID = 1L;
 
-	private final String QUEUE_NAME;
-	private final String HOST_NAME;
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
 
-	private transient ConnectionFactory factory;
-	private transient Connection connection;
-	private transient Channel channel;
-	private transient QueueingConsumer consumer;
-	private transient QueueingConsumer.Delivery delivery;
+	private final String hostName;
+	private final String queueName;
+	private final boolean usesCorrelationId;
+	protected DeserializationSchema<OUT> schema;
+
+	protected transient Connection connection;
+	protected transient Channel channel;
+	protected transient QueueingConsumer consumer;
+
+	protected transient boolean autoAck;
 
 	private transient volatile boolean running;
-	
-	protected DeserializationSchema<OUT> schema;
 
-	int count = 0;
-	
-	public RMQSource(String HOST_NAME, String QUEUE_NAME,
+
+	/**
+	 * Creates a new RabbitMQ source with at-least-once message processing guarantee when
+	 * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
+	 * For exactly-once, please use the constructor
+	 * {@link RMQSource#RMQSource(String, String, boolean usesCorrelationId, DeserializationSchema)},
+	 * set {@param usesCorrelationId} to true and enable checkpointing.
+	 * @param hostName The RabbiMQ broker's address to connect to.
+	 * @param queueName  The queue to receive messages from.
+	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
+	 *               				into Java objects.
+	 */
+	public RMQSource(String hostName, String queueName,
+					DeserializationSchema<OUT> deserializationSchema) {
+		this(hostName, queueName, false, deserializationSchema);
+	}
+
+	/**
+	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
+	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
+	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
+	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
+	 * @param hostName The RabbiMQ broker's address to connect to.
+	 * @param queueName The queue to receive messages from.
+	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
+	 *                          id to deduplicate messages (in case of failed acknowledgments).
+	 *                          Only used when checkpointing is enabled.
+	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
+	 *                              into Java objects.
+	 */
+	public RMQSource(String hostName, String queueName, boolean usesCorrelationId,
 			DeserializationSchema<OUT> deserializationSchema) {
-		super(Long.class);
+		super(String.class);
+		this.hostName = hostName;
+		this.queueName = queueName;
+		this.usesCorrelationId = usesCorrelationId;
 		this.schema = deserializationSchema;
-		this.HOST_NAME = HOST_NAME;
-		this.QUEUE_NAME = QUEUE_NAME;
 	}
 
 	/**
 	 * Initializes the connection to RMQ.
 	 */
-	private void initializeConnection() {
-		factory = new ConnectionFactory();
-		factory.setHost(HOST_NAME);
+	protected void initializeConnection() {
+		ConnectionFactory factory = new ConnectionFactory();
+		factory.setHost(hostName);
 		try {
 			connection = factory.newConnection();
 			channel = connection.createChannel();
-			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
+			channel.queueDeclare(queueName, true, false, false, null);
 			consumer = new QueueingConsumer(channel);
-			channel.basicConsume(QUEUE_NAME, false, consumer);
+
+			RuntimeContext runtimeContext = getRuntimeContext();
+			if (runtimeContext instanceof StreamingRuntimeContext
+				&& ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
+				autoAck = false;
+				// enables transaction mode
+				channel.txSelect();
+			} else {
+				autoAck = true;
+			}
+
+			LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck);
+			channel.basicConsume(queueName, autoAck, consumer);
+
 		} catch (IOException e) {
-			throw new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
-					+ HOST_NAME, e);
+			throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
+					+ hostName, e);
 		}
 	}
 
@@ -89,24 +165,40 @@ public class RMQSource<OUT> extends MessageAcknowledingSourceBase<OUT, Long> imp
 		try {
 			connection.close();
 		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
-					+ " at " + HOST_NAME, e);
+			throw new RuntimeException("Error while closing RMQ connection with " + queueName
+					+ " at " + hostName, e);
 		}
 	}
 
+
 	@Override
 	public void run(SourceContext<OUT> ctx) throws Exception {
 		while (running) {
-			delivery = consumer.nextDelivery();
+			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 
 			synchronized (ctx.getCheckpointLock()) {
+
 				OUT result = schema.deserialize(delivery.getBody());
-				addId(ctx, delivery.getEnvelope().getDeliveryTag());
-				
+
 				if (schema.isEndOfStream(result)) {
 					break;
 				}
-				
+
+				if (!autoAck) {
+					final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+					if (usesCorrelationId) {
+						final String correlationId = delivery.getProperties().getCorrelationId();
+						Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
+							"with usesCorrelationId set to true but a message was received with " +
+							"correlation id set to null!");
+						if (!addId(correlationId)) {
+							// we have already processed this message
+							continue;
+						}
+					}
+					sessionIds.add(deliveryTag);
+				}
+
 				ctx.collect(result);
 			}
 		}
@@ -118,11 +210,14 @@ public class RMQSource<OUT> extends MessageAcknowledingSourceBase<OUT, Long> imp
 	}
 
 	@Override
-	protected void acknowledgeIDs(List<Long> ids) {
+	protected void acknowledgeSessionIDs(List<Long> sessionIds) {
 		try {
-			channel.basicAck(ids.get(ids.size() - 1), true);
+			for (long id : sessionIds) {
+				channel.basicAck(id, false);
+			}
+			channel.txCommit();
 		} catch (IOException e) {
-			Log.error("Messages could not be acknowledged", e);
+			throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
new file mode 100644
index 0000000..1da9d27
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+import junit.framework.Assert;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.modules.junit4.PowerMockRunner;
+import com.rabbitmq.client.Connection;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Tests for the RMQSource. The source supports two operation modes.
+ * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and the deduplication mechanism in
+ *    {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.
+ * 2) At-least-once (when checkpointed) with RabbitMQ transactions but not deduplication.
+ * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
+ *
+ * This tests assumes that the message ids are increasing monotonously. That doesn't have to be the
+ * case. The correlation id is used to uniquely identify messages.
+ */
+@RunWith(PowerMockRunner.class)
+public class RMQSourceTest {
+
+	private RMQSource<String> source;
+
+	private Configuration config = new Configuration();
+
+	private Thread sourceThread;
+
+	private volatile long messageId;
+
+	private boolean generateCorrelationIds = true;
+
+	private volatile Exception exception;
+
+	@Before
+	public void beforeTest() throws Exception {
+
+		source = new RMQTestSource<>("hostDummy", "queueDummy", true, new StringDeserializationScheme());
+		source.open(config);
+		source.initializeConnection();
+
+		messageId = 0;
+
+		sourceThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					source.run(new DummySourceContext());
+				} catch (Exception e) {
+					exception = e;
+				}
+			}
+		});
+	}
+
+	@After
+	public void afterTest() throws Exception {
+		source.cancel();
+		sourceThread.join();
+	}
+
+	@Test
+	public void testCheckpointing() throws Exception {
+		source.autoAck = false;
+		sourceThread.start();
+
+		Thread.sleep(5);
+
+		final Random random = new Random(System.currentTimeMillis());
+		int numSnapshots = 50;
+		long previousSnapshotId;
+		long lastSnapshotId = 0;
+
+		long totalNumberOfAcks = 0;
+
+		for (int i=0; i < numSnapshots; i++) {
+			long snapshotId = random.nextLong();
+			SerializedCheckpointData[] data;
+
+			synchronized (DummySourceContext.lock) {
+				data = source.snapshotState(snapshotId, System.currentTimeMillis());
+				previousSnapshotId = lastSnapshotId;
+				lastSnapshotId = messageId;
+			}
+			// let some time pass
+			Thread.sleep(5);
+
+			// check if the correct number of messages have been snapshotted
+			final long numIds = lastSnapshotId - previousSnapshotId;
+			assertEquals(numIds, data[0].getNumIds());
+			// deserialize and check if the last id equals the last snapshotted id
+			ArrayDeque<Tuple2<Long, List<String>>> deque = SerializedCheckpointData.toDeque(data, new StringSerializer());
+			List<String> messageIds = deque.getLast().f1;
+			if (messageIds.size() > 0) {
+				assertEquals(lastSnapshotId, (long) Long.valueOf(messageIds.get(messageIds.size() - 1)));
+			}
+
+			// check if the messages are being acknowledged and the transaction comitted
+			source.notifyCheckpointComplete(snapshotId);
+			totalNumberOfAcks += numIds;
+
+		}
+
+		Mockito.verify(source.channel, Mockito.times((int) totalNumberOfAcks)).basicAck(Mockito.anyLong(), Mockito.eq(false));
+		Mockito.verify(source.channel, Mockito.times(numSnapshots)).txCommit();
+
+	}
+
+	/**
+	 * Checks whether recurring ids are processed again (they shouldn't be).
+	 */
+	@Test
+	public void testDuplicateId() throws Exception {
+		source.autoAck = false;
+		sourceThread.start();
+
+		while (messageId < 10) {
+			// wait until messages have been processed
+			Thread.sleep(5);
+		}
+
+		long oldMessageId;
+		synchronized (DummySourceContext.lock) {
+			oldMessageId = messageId;
+			messageId = 0;
+		}
+
+		while (messageId < 10) {
+			// process again
+			Thread.sleep(5);
+		}
+
+		synchronized (DummySourceContext.lock) {
+			assertEquals(Math.max(messageId, oldMessageId), DummySourceContext.numElementsCollected);
+		}
+	}
+
+
+	/**
+	 * The source should not acknowledge ids in auto-commit mode or check for previously acknowledged ids
+	 */
+	@Test
+	public void testCheckpointingDisabled() throws Exception {
+		source.autoAck = true;
+		sourceThread.start();
+
+		while (DummySourceContext.numElementsCollected < 50) {
+			// wait until messages have been processed
+			Thread.sleep(5);
+		}
+
+		// see addId in RMQTestSource.addId for the assert
+	}
+
+	/**
+	 * Tests error reporting in case of invalid correlation ids
+	 */
+	@Test
+	public void testCorrelationIdNotSet() throws InterruptedException {
+		generateCorrelationIds = false;
+		source.autoAck = false;
+		sourceThread.start();
+
+		sourceThread.join();
+
+		assertNotNull(exception);
+		assertTrue(exception instanceof NullPointerException);
+	}
+
+
+	private static class StringDeserializationScheme implements DeserializationSchema<String> {
+
+		@Override
+		public String deserialize(byte[] message) throws IOException {
+			try {
+				// wait a bit to not cause too much cpu load
+				Thread.sleep(1);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+			return new String(message);
+		}
+
+		@Override
+		public boolean isEndOfStream(String nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<String> getProducedType() {
+			return TypeExtractor.getForClass(String.class);
+		}
+	}
+
+	private class RMQTestSource<OUT> extends RMQSource<OUT> {
+
+		public RMQTestSource(String hostName, String queueName, boolean usesCorrelationIds,
+							 DeserializationSchema<OUT> deserializationSchema) {
+			super(hostName, queueName, usesCorrelationIds, deserializationSchema);
+		}
+
+		@Override
+		protected void initializeConnection() {
+			connection = Mockito.mock(Connection.class);
+			channel = Mockito.mock(Channel.class);
+			consumer = Mockito.mock(QueueingConsumer.class);
+
+			// Mock for delivery
+			final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
+			Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
+
+			// Mock for envelope
+			Envelope envelope = Mockito.mock(Envelope.class);
+			Mockito.when(deliveryMock.getEnvelope()).thenReturn(envelope);
+
+			try {
+				Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);
+			} catch (InterruptedException e) {
+				fail("Couldn't setup up deliveryMock");
+			}
+
+			Mockito.when(envelope.getDeliveryTag()).thenAnswer(new Answer<Long>() {
+				@Override
+				public Long answer(InvocationOnMock invocation) throws Throwable {
+					return ++messageId;
+				}
+			});
+
+			// Mock for properties
+			AMQP.BasicProperties props = Mockito.mock(AMQP.BasicProperties.class);
+			Mockito.when(deliveryMock.getProperties()).thenReturn(props);
+
+			Mockito.when(props.getCorrelationId()).thenAnswer(new Answer<String>() {
+				@Override
+				public String answer(InvocationOnMock invocation) throws Throwable {
+					return generateCorrelationIds ? "" + messageId : null;
+				}
+			});
+
+		}
+
+		@Override
+		protected boolean addId(String uid) {
+			assertEquals(false, autoAck);
+			return super.addId(uid);
+		}
+	}
+
+	private static class DummySourceContext implements SourceFunction.SourceContext<String> {
+
+		private static final Object lock = new Object();
+
+		private static long numElementsCollected;
+
+		public DummySourceContext() {
+			numElementsCollected = 0;
+		}
+
+		@Override
+		public void collect(String element) {
+			numElementsCollected++;
+		}
+
+		@Override
+		public void collectWithTimestamp(java.lang.String element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
new file mode 100644
index 0000000..4385884
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for data sources that receive elements from a message queue and
+ * acknowledge them back by IDs.
+ * <p>
+ * The mechanism for this source assumes that messages are identified by a unique ID.
+ * When messages are taken from the message queue, the message must not be dropped immediately,
+ * but must be retained until acknowledged. Messages that are not acknowledged within a certain
+ * time interval will be served again (to a different connection, established by the recovered source).
+ * <p>
+ * Note that this source can give no guarantees about message order in the case of failures,
+ * because messages that were retrieved but not yet acknowledged will be returned later again, after
+ * a set of messages that was not retrieved before the failure.
+ * <p>
+ * Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
+ * acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain
+ * that it has been successfully processed throughout the topology and the updates to any state caused by
+ * that message are persistent.
+ * <p>
+ * All messages that are emitted and successfully processed by the streaming program will eventually be
+ * acknowledged. In corner cases, the source may receive certain IDs multiple times, if a
+ * failure occurs while acknowledging. To cope with this situation, an additional Set stores all
+ * processed IDs. IDs are only removed after they have been acknowledged.
+ * <p>
+ * A typical way to use this base in a source function is by implementing a run() method as follows:
+ * <pre>{@code
+ * public void run(SourceContext<Type> ctx) throws Exception {
+ *     while (running) {
+ *         Message msg = queue.retrieve();
+ *         synchronized (ctx.getCheckpointLock()) {
+ *             ctx.collect(msg.getMessageData());
+ *             addId(msg.getMessageId());
+ *         }
+ *     }
+ * }
+ * }</pre>
+ * 
+ * @param <Type> The type of the messages created by the source.
+ * @param <UId> The type of unique IDs which may be used to acknowledge elements.
+ */
+public abstract class MessageAcknowledgingSourceBase<Type, UId>
+	extends RichSourceFunction<Type>
+	implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
+
+	private static final long serialVersionUID = -8689291992192955579L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(MessageAcknowledgingSourceBase.class);
+
+	/** Serializer used to serialize the IDs for checkpoints */
+	private final TypeSerializer<UId> idSerializer;
+
+	/** The list gathering the IDs of messages emitted during the current checkpoint */
+	private transient List<UId> idsForCurrentCheckpoint;
+
+	/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
+	private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
+
+	/**
+	 * Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
+	 * a checkpoint, ids may be processed again. This happens when the checkpoint completed but the
+	 * ids for a checkpoint haven't been acknowledged yet.
+	 */
+	private transient Set<UId> idsProcessedButNotAcknowledged;
+
+	protected int numCheckpointsToKeep = 10;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new MessageAcknowledgingSourceBase for IDs of the given type.
+	 *
+	 * @param idClass The class of the message ID type, used to create a serializer for the message IDs.
+	 */
+	protected MessageAcknowledgingSourceBase(Class<UId> idClass) {
+		this(TypeExtractor.getForClass(idClass));
+	}
+
+	/**
+	 * Creates a new MessageAcknowledgingSourceBase for IDs of the given type.
+	 *
+	 * @param idTypeInfo The type information of the message ID type, used to create a serializer for the message IDs.
+	 */
+	protected MessageAcknowledgingSourceBase(TypeInformation<UId> idTypeInfo) {
+		this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		idsForCurrentCheckpoint = new ArrayList<>(64);
+		pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep);
+		idsProcessedButNotAcknowledged = new HashSet<>();
+	}
+
+	@Override
+	public void close() throws Exception {
+		idsForCurrentCheckpoint.clear();
+		pendingCheckpoints.clear();
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  ID Checkpointing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method must be implemented to acknowledge the given set of IDs back to the message queue.
+	 * @param UIds The list od IDs to acknowledge.
+	 */
+	protected abstract void acknowledgeIDs(long checkpointId, List<UId> UIds);
+
+	/**
+	 * Adds an ID to be stored with the current checkpoint.
+	 * @param uid The ID to add.
+	 * @return True if the id has not been processed previously.
+	 */
+	protected boolean addId(UId uid) {
+		idsForCurrentCheckpoint.add(uid);
+		return idsProcessedButNotAcknowledged.add(uid);
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing the data
+	// ------------------------------------------------------------------------
+
+	@Override
+	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
+					idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
+
+		pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
+
+		idsForCurrentCheckpoint = new ArrayList<>(64);
+
+		return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
+	}
+
+	@Override
+	public void restoreState(SerializedCheckpointData[] state) throws Exception {
+		pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
+		// build a set which contains all processed ids. It may be used to check if we have
+		// already processed an incoming message.
+		for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
+			idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
+		}
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
+
+		for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
+			Tuple2<Long, List<UId>> checkpoint = iter.next();
+			long id = checkpoint.f0;
+
+			if (id <= checkpointId) {
+				LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
+				acknowledgeIDs(checkpointId, checkpoint.f1);
+				// remove deduplication data
+				idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
+				// remove checkpoint data
+				iter.remove();
+			}
+			else {
+				break;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
deleted file mode 100644
index 1a24bd8..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract base class for data sources that receive elements from a message queue and
- * acknowledge them back by IDs.
- * <p>
- * The mechanism for this source assumes that messages are identified by a unique ID.
- * When messages are taken from the message queue, the message must not be dropped immediately,
- * but must be retained until acknowledged. Messages that are not acknowledged within a certain
- * time interval will be served again (to a different connection, established by the recovered source).
- * <p>
- * Note that this source can give no guarantees about message order in teh case of failures,
- * because messages that were retrieved but not yet acknowledged will be returned later again, after
- * a set of messages that was not retrieved before the failure.
- * <p>
- * Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
- * acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain
- * that it has been successfully processed throughout the topology and the updates to any state caused by
- * that message are persistent.
- * <p>
- * All messages that are emitted and successfully processed by the streaming program will eventually be
- * acknowledged. In corner cases, the source may acknowledge certain IDs multiple times, if a
- * failure occurs while acknowledging.
- * <p>
- * A typical way to use this base in a source function is by implementing a run() method as follows:
- * <pre>{@code
- * public void run(SourceContext<Type> ctx) throws Exception {
- *     while (running) {
- *         Message msg = queue.retrieve();
- *         synchronized (ctx.getCheckpointLock()) {
- *             ctx.collect(msg.getMessageData());
- *             addId(msg.getMessageId());
- *         }
- *     }
- * }
- * }</pre>
- * 
- * @param <Type> The type of the messages created by the source.
- * @param <Id> The type of the IDs that are used for acknowledging elements.
- */
-public abstract class MessageAcknowledingSourceBase<Type, Id> extends RichSourceFunction<Type> 
-	implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
-	
-	private static final long serialVersionUID = -8689291992192955579L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(MessageAcknowledingSourceBase.class);
-	
-	/** Serializer used to serialize the IDs for checkpoints */
-	private final TypeSerializer<Id> idSerializer;
-	
-	/** The list gathering the IDs of messages emitted during the current checkpoint */
-	private transient List<Id> idsForCurrentCheckpoint;
-
-	/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
-	private transient ArrayDeque<Tuple2<Long, List<Id>>> pendingCheckpoints;
-	
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
-	 * 
-	 * @param idClass The class of the message ID type, used to create a serializer for the message IDs.
-	 */
-	protected MessageAcknowledingSourceBase(Class<Id> idClass) {
-		this(TypeExtractor.getForClass(idClass));
-	}
-
-	/**
-	 * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
-	 * 
-	 * @param idTypeInfo The type information of the message ID type, used to create a serializer for the message IDs.
-	 */
-	protected MessageAcknowledingSourceBase(TypeInformation<Id> idTypeInfo) {
-		this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		idsForCurrentCheckpoint = new ArrayList<>(64);
-		pendingCheckpoints = new ArrayDeque<>();
-	}
-
-	@Override
-	public void close() throws Exception {
-		idsForCurrentCheckpoint.clear();
-		pendingCheckpoints.clear();
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  ID Checkpointing
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This method must be implemented to acknowledge the given set of IDs back to the message queue.
-	 * @param ids The list od IDs to acknowledge.
-	 */
-	protected abstract void acknowledgeIDs(List<Id> ids);
-
-	/**
-	 * Adds an ID to be stored with the current checkpoint.
-	 * @param id The ID to add.
-	 */
-	protected void addId(SourceContext<Type> ctx, Id id) {
-		idsForCurrentCheckpoint.add(id);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpointing the data
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
-					idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
-		}
-		
-		pendingCheckpoints.addLast(new Tuple2<Long, List<Id>>(checkpointId, idsForCurrentCheckpoint));
-		idsForCurrentCheckpoint = new ArrayList<>(64);
-		
-		return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
-	}
-
-	@Override
-	public void restoreState(SerializedCheckpointData[] state) throws Exception {
-		pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {	
-		// only one commit operation must be in progress
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
-		}
-		
-		for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
-			Tuple2<Long, List<Id>> checkpoint = iter.next();
-			long id = checkpoint.f0;
-			
-			if (id <= checkpointId) {
-				if (LOG.isTraceEnabled()) {
-					LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
-				}
-				acknowledgeIDs(checkpoint.f1);
-				iter.remove();
-			}
-			else {
-				break;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9215b724/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
new file mode 100644
index 0000000..c097066
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for data sources that receive elements from a message queue and
+ * acknowledge them back by IDs. In contrast to {@link MessageAcknowledgingSourceBase}, this source
+ * handles two types of ids:
+ *
+ * 1) Session ids
+ * 2) Unique message ids
+ *
+ * Session ids are used to acknowledge messages in a session. When a checkpoint is restored,
+ * unacknowledged messages are redelivered. Duplicates are detected using the unique message ids
+ * which are checkpointed.
+ *
+ * @param <Type> The type of the messages created by the source.
+ * @param <UId> The type of the unique IDs which are consistent across sessions.
+ * @param <SessionId> The type of the IDs that are used for acknowledging elements
+ *                    (ids valid during session).
+ */
+public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, SessionId>
+		extends MessageAcknowledgingSourceBase<Type, UId> {
+
+	private static final long serialVersionUID = 42L;
+
+	private static final Logger LOG =
+		LoggerFactory.getLogger(MultipleIdsMessageAcknowledgingSourceBase.class);
+
+	/* Session ids per pending snapshot */
+	protected transient Deque<Tuple2<Long, List<SessionId>>> sessionIdsPerSnapshot;
+
+	/* Current session ids for this snapshot */
+	protected transient List<SessionId> sessionIds;
+
+	/**
+	 * Creates a new MessageAcknowledgingSourceBase for IDs of the given type.
+	 *
+	 * @param idClass The class of the message ID type, used to create a serializer for the message IDs.
+	 */
+	protected MultipleIdsMessageAcknowledgingSourceBase(Class<UId> idClass) {
+		super(idClass);
+	}
+
+	/**
+	 * Creates a new MessageAcknowledgingSourceBase for IDs of the given type.
+	 *
+	 * @param idTypeInfo The type information of the message ID type, used to create a serializer for the message IDs.
+	 */
+	protected MultipleIdsMessageAcknowledgingSourceBase(TypeInformation<UId> idTypeInfo) {
+		super(idTypeInfo);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		sessionIds = new ArrayList<>(64);
+		sessionIdsPerSnapshot = new ArrayDeque<>();
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		sessionIds.clear();
+		sessionIdsPerSnapshot.clear();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ID Checkpointing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Acknowledges the session ids.
+	 * @param checkpointId The id of the current checkout to acknowledge ids for.
+	 * @param uniqueIds The checkpointed unique ids which are ignored here. They only serve as a
+	 *                  means of de-duplicating messages when the acknowledgment after a checkpoint
+	 *                  fails.
+	 */
+	protected final void acknowledgeIDs(long checkpointId, List<UId> uniqueIds) {
+		LOG.debug("Acknowledging ids for checkpoint {}", checkpointId);
+		Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator();
+		while (iterator.hasNext()) {
+			final Tuple2<Long, List<SessionId>> next = iterator.next();
+			long id = next.f0;
+			if (id <= checkpointId) {
+				acknowledgeSessionIDs(next.f1);
+				// remove ids for this session
+				iterator.remove();
+			}
+		}
+	}
+
+	/**
+	 * Acknowledges the session ids.
+	 * @param sessionIds The message ids for this session.
+	 */
+	protected abstract void acknowledgeSessionIDs(List<SessionId> sessionIds);
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing the data
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds));
+		sessionIds = new ArrayList<>(64);
+		return super.snapshotState(checkpointId, checkpointTimestamp);
+	}
+}


Mime
View raw message