flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-7347] Keep IDs for checkpoint in a set in MessageQueue Source
Date Fri, 18 Aug 2017 09:26:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7b2362406 -> 76f102288


[FLINK-7347] Keep IDs for checkpoint in a set in MessageQueue Source

Previously, they were kept in a List, which made removal from the Set of
unconfirmed IDs prohibitively expensive in
MessageAcknowledgingSourceBase.notifyCheckpointComplete().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76f10228
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76f10228
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76f10228

Branch: refs/heads/master
Commit: 76f1022884fe7b291fe81028a29896fb5b5ca5c9
Parents: 7b23624
Author: Yonatan Most <yomost@microsoft.com>
Authored: Wed Aug 2 17:58:50 2017 +0300
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Aug 18 11:17:43 2017 +0200

----------------------------------------------------------------------
 .../connectors/rabbitmq/RMQSourceTest.java      | 12 +++++-----
 .../runtime/state/SerializedCheckpointData.java | 23 ++++++++++----------
 .../source/MessageAcknowledgingSourceBase.java  | 16 +++++++-------
 ...ltipleIdsMessageAcknowledgingSourceBase.java |  4 +++-
 4 files changed, 28 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76f10228/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 05ae810..f180e78 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -51,8 +51,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -180,12 +180,12 @@ public class RMQSourceTest {
 			testHarnessCopy.initializeState(data);
 			testHarnessCopy.open();
 
-			ArrayDeque<Tuple2<Long, List<String>>> deque = sourceCopy.getRestoredState();
-			List<String> messageIds = deque.getLast().f1;
+			ArrayDeque<Tuple2<Long, Set<String>>> deque = sourceCopy.getRestoredState();
+			Set<String> messageIds = deque.getLast().f1;
 
 			assertEquals(numIds, messageIds.size());
 			if (messageIds.size() > 0) {
-				assertEquals(lastSnapshotId, (long) Long.valueOf(messageIds.get(messageIds.size() - 1)));
+				assertTrue(messageIds.contains(Long.toString(lastSnapshotId)));
 			}
 
 			// check if the messages are being acknowledged and the transaction committed
@@ -339,7 +339,7 @@ public class RMQSourceTest {
 
 	private class RMQTestSource extends RMQSource<String> {
 
-		private ArrayDeque<Tuple2<Long, List<String>>> restoredState;
+		private ArrayDeque<Tuple2<Long, Set<String>>> restoredState;
 
 		public RMQTestSource() {
 			super(new RMQConnectionConfig.Builder().setHost("hostTest")
@@ -353,7 +353,7 @@ public class RMQSourceTest {
 			this.restoredState = this.pendingCheckpoints;
 		}
 
-		public ArrayDeque<Tuple2<Long, List<String>>> getRestoredState() {
+		public ArrayDeque<Tuple2<Long, Set<String>>> getRestoredState() {
 			return this.restoredState;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76f10228/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
index 16ad3fd..394791b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.util.DataOutputSerializer;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * This class represents serialized checkpoint data for a collection of elements.
@@ -95,7 +95,7 @@ public class SerializedCheckpointData implements java.io.Serializable {
 	 * 
 	 * @throws IOException Thrown, if the serialization fails.
 	 */
-	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long,
List<T>>> checkpoints,
+	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long,
Set<T>>> checkpoints,
 												TypeSerializer<T> serializer) throws IOException {
 		return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
 	}
@@ -111,15 +111,15 @@ public class SerializedCheckpointData implements java.io.Serializable
{
 	 *
 	 * @throws IOException Thrown, if the serialization fails.
 	 */
-	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long,
List<T>>> checkpoints,
+	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long,
Set<T>>> checkpoints,
 												TypeSerializer<T> serializer,
 												DataOutputSerializer outputBuffer) throws IOException {
 		SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
 		
 		int pos = 0;
-		for (Tuple2<Long, List<T>> checkpoint : checkpoints) {
+		for (Tuple2<Long, Set<T>> checkpoint : checkpoints) {
 			outputBuffer.clear();
-			List<T> checkpointIds = checkpoint.f1;
+			Set<T> checkpointIds = checkpoint.f1;
 			
 			for (T id : checkpointIds) {
 				serializer.serialize(id, outputBuffer);
@@ -146,10 +146,9 @@ public class SerializedCheckpointData implements java.io.Serializable
{
 	 * 
 	 * @throws IOException Thrown, if the serialization fails.
 	 */
-	public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque(
-			SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException
-	{
-		ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length);
+	public static <T> ArrayDeque<Tuple2<Long, Set<T>>> toDeque(
+			SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException
{
+		ArrayDeque<Tuple2<Long, Set<T>>> deque = new ArrayDeque<>(data.length);
 		DataInputDeserializer deser = null;
 		
 		for (SerializedCheckpointData checkpoint : data) {
@@ -161,14 +160,14 @@ public class SerializedCheckpointData implements java.io.Serializable
{
 				deser.setBuffer(serializedData, 0, serializedData.length);
 			}
 			
-			final List<T> ids = new ArrayList<>(checkpoint.getNumIds());
+			final Set<T> ids = new HashSet<>(checkpoint.getNumIds());
 			final int numIds = checkpoint.getNumIds();
 			
 			for (int i = 0; i < numIds; i++) {
 				ids.add(serializer.deserialize(deser));
 			}
 
-			deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids));
+			deque.addLast(new Tuple2<Long, Set<T>>(checkpoint.checkpointId, ids));
 		}
 		
 		return deque;

http://git-wip-us.apache.org/repos/asf/flink/blob/76f10228/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index ab21586..604755d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -96,13 +96,13 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	private final TypeSerializer<UId> idSerializer;
 
 	/** The list gathering the IDs of messages emitted during the current checkpoint. */
-	private transient List<UId> idsForCurrentCheckpoint;
+	private transient Set<UId> idsForCurrentCheckpoint;
 
 	/**
 	 * The list with IDs from checkpoints that were triggered, but not yet completed or notified
of
 	 * completion.
 	 */
-	protected transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
+	protected transient ArrayDeque<Tuple2<Long, Set<UId>>> pendingCheckpoints;
 
 	/**
 	 * Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
@@ -142,7 +142,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 			.getOperatorStateStore()
 			.getSerializableListState("message-acknowledging-source-state");
 
-		this.idsForCurrentCheckpoint = new ArrayList<>(64);
+		this.idsForCurrentCheckpoint = new HashSet<>(64);
 		this.pendingCheckpoints = new ArrayDeque<>();
 		this.idsProcessedButNotAcknowledged = new HashSet<>();
 
@@ -161,7 +161,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 			pendingCheckpoints = SerializedCheckpointData.toDeque(retrievedStates.get(0), idSerializer);
 			// build a set which contains all processed ids. It may be used to check if we have
 			// already processed an incoming message.
-			for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
+			for (Tuple2<Long, Set<UId>> checkpoint : pendingCheckpoints) {
 				idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
 			}
 		} else {
@@ -185,7 +185,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	 *
 	 * @param uIds The list od IDs to acknowledge.
 	 */
-	protected abstract void acknowledgeIDs(long checkpointId, List<UId> uIds);
+	protected abstract void acknowledgeIDs(long checkpointId, Set<UId> uIds);
 
 	/**
 	 * Adds an ID to be stored with the current checkpoint.
@@ -213,7 +213,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 		}
 
 		pendingCheckpoints.addLast(new Tuple2<>(context.getCheckpointId(), idsForCurrentCheckpoint));
-		idsForCurrentCheckpoint = new ArrayList<>(64);
+		idsForCurrentCheckpoint = new HashSet<>(64);
 
 		this.checkpointedState.clear();
 		this.checkpointedState.add(SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer));
@@ -223,8 +223,8 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
 
-		for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator();
iter.hasNext();) {
-			Tuple2<Long, List<UId>> checkpoint = iter.next();
+		for (Iterator<Tuple2<Long, Set<UId>>> iter = pendingCheckpoints.iterator();
iter.hasNext();) {
+			Tuple2<Long, Set<UId>> checkpoint = iter.next();
 			long id = checkpoint.f0;
 
 			if (id <= checkpointId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/76f10228/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
index e7cdb99..d0c0741 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Abstract base class for data sources that receive elements from a message queue and
@@ -110,7 +111,8 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type,
UId, Sessi
 	 *                  means of de-duplicating messages when the acknowledgment after a checkpoint
 	 *                  fails.
 	 */
-	protected final void acknowledgeIDs(long checkpointId, List<UId> uniqueIds) {
+	@Override
+	protected final void acknowledgeIDs(long checkpointId, Set<UId> uniqueIds) {
 		LOG.debug("Acknowledging ids for checkpoint {}", checkpointId);
 		Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator();
 		while (iterator.hasNext()) {


Mime
View raw message