flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/5] flink git commit: [FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ
Date Thu, 28 Jan 2016 15:32:20 GMT
[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ

This closes #1534


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b01a890
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b01a890
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b01a890

Branch: refs/heads/master
Commit: 6b01a89020f2de3f7710cf72336291b1e8ca8562
Parents: d97fcda
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu Jan 21 12:22:21 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 28 14:43:03 2016 +0100

----------------------------------------------------------------------
 .../connectors/rabbitmq/RMQSource.java          |  4 +-
 .../connectors/rabbitmq/RMQSourceTest.java      | 79 ++++++++++++++++++++
 .../source/MessageAcknowledgingSourceBase.java  | 51 +++++++------
 ...ltipleIdsMessageAcknowledgingSourceBase.java | 24 +++---
 4 files changed, 124 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 09bb07c..59bc057 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -196,7 +196,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 							continue;
 						}
 					}
-					sessionIds.add(deliveryTag);
+					synchronized (sessionIdsPerSnapshot) {
+						sessionIds.add(deliveryTag);
+					}
 				}
 
 				ctx.collect(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index aa19e5d..0a3de84 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -23,6 +23,7 @@ import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
@@ -31,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -103,6 +105,83 @@ public class RMQSourceTest {
 		sourceThread.join();
 	}
 
+	/**
+	 * Make sure concurrent access to snapshotState() and notifyCheckpointComplete() don't cause
+	 * an issue.
+	 *
+	 * Without proper synchronization, the test will fail with a concurrent modification exception
+	 *
+	 */
+	@Test
+	public void testConcurrentAccess() throws Exception {
+		source.autoAck = false;
+		sourceThread.start();
+
+		final Tuple1<Throwable> error = new Tuple1<>(null);
+
+		Thread.sleep(5);
+
+		Thread snapshotThread = new Thread(new Runnable() {
+			public long id = 0;
+
+			@Override
+			public void run() {
+				while (!Thread.interrupted()) {
+					try {
+						source.snapshotState(id++, 0);
+					} catch (Exception e) {
+						error.f0 = e;
+						break; // stop thread
+					}
+				}
+			}
+		});
+
+		Thread notifyThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				while (!Thread.interrupted()) {
+					try {
+						// always remove all checkpoints
+						source.notifyCheckpointComplete(Long.MAX_VALUE);
+					} catch (Exception e) {
+						error.f0 = e;
+						break; // stop thread
+					}
+				}
+			}
+		});
+
+		snapshotThread.start();
+		notifyThread.start();
+
+		long deadline = System.currentTimeMillis() + 1000L;
+		while(System.currentTimeMillis() < deadline) {
+			if(!snapshotThread.isAlive()) {
+				notifyThread.interrupt();
+				break;
+			}
+			if(!notifyThread.isAlive()) {
+				snapshotThread.interrupt();
+				break;
+			}
+			Thread.sleep(10);
+		}
+		if(snapshotThread.isAlive()) {
+			snapshotThread.interrupt();
+			snapshotThread.join();
+		}
+		if(notifyThread.isAlive()) {
+			notifyThread.interrupt();
+			notifyThread.join();
+		}
+		if(error.f0 != null) {
+			error.f0.printStackTrace();
+			Assert.fail("Test failed with " + error.f0.getClass().getCanonicalName());
+		}
+
+	}
+
 	@Test
 	public void testCheckpointing() throws Exception {
 		source.autoAck = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 4385884..2f865d1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
  * @param <Type> The type of the messages created by the source.
  * @param <UId> The type of unique IDs which may be used to acknowledge elements.
  */
+@SuppressWarnings("SynchronizeOnNonFinalField")
 public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	extends RichSourceFunction<Type>
 	implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
@@ -166,41 +167,45 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 		LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
 					idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
 
-		pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
+		synchronized (pendingCheckpoints) {
+			pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
 
-		idsForCurrentCheckpoint = new ArrayList<>(64);
+			idsForCurrentCheckpoint = new ArrayList<>(64);
 
-		return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
+			return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
+		}
 	}
 
 	@Override
 	public void restoreState(SerializedCheckpointData[] state) throws Exception {
-		pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
-		// build a set which contains all processed ids. It may be used to check if we have
-		// already processed an incoming message.
-		for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
-			idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
+		synchronized (pendingCheckpoints) {
+			pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
+			// build a set which contains all processed ids. It may be used to check if we have
+			// already processed an incoming message.
+			for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
+				idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
+			}
 		}
 	}
 
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
-
-		for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator();
iter.hasNext();) {
-			Tuple2<Long, List<UId>> checkpoint = iter.next();
-			long id = checkpoint.f0;
-
-			if (id <= checkpointId) {
-				LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
-				acknowledgeIDs(checkpointId, checkpoint.f1);
-				// remove deduplication data
-				idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
-				// remove checkpoint data
-				iter.remove();
-			}
-			else {
-				break;
+		synchronized (pendingCheckpoints) {
+			for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator();
iter.hasNext(); ) {
+				Tuple2<Long, List<UId>> checkpoint = iter.next();
+				long id = checkpoint.f0;
+
+				if (id <= checkpointId) {
+					LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
+					acknowledgeIDs(checkpointId, checkpoint.f1);
+					// remove deduplication data
+					idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
+					// remove checkpoint data
+					iter.remove();
+				} else {
+					break;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
index c097066..4709759 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
@@ -107,14 +107,16 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type,
UId, Sessi
 	 */
 	protected final void acknowledgeIDs(long checkpointId, List<UId> uniqueIds) {
 		LOG.debug("Acknowledging ids for checkpoint {}", checkpointId);
-		Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator();
-		while (iterator.hasNext()) {
-			final Tuple2<Long, List<SessionId>> next = iterator.next();
-			long id = next.f0;
-			if (id <= checkpointId) {
-				acknowledgeSessionIDs(next.f1);
-				// remove ids for this session
-				iterator.remove();
+		synchronized (sessionIdsPerSnapshot) {
+			Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator();
+			while (iterator.hasNext()) {
+				final Tuple2<Long, List<SessionId>> next = iterator.next();
+				long id = next.f0;
+				if (id <= checkpointId) {
+					acknowledgeSessionIDs(next.f1);
+					// remove ids for this session
+					iterator.remove();
+				}
 			}
 		}
 	}
@@ -132,8 +134,10 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type,
UId, Sessi
 
 	@Override
 	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp)
throws Exception {
-		sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds));
-		sessionIds = new ArrayList<>(64);
+		synchronized (sessionIdsPerSnapshot) {
+			sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds));
+			sessionIds = new ArrayList<>(64);
+		}
 		return super.snapshotState(checkpointId, checkpointTimestamp);
 	}
 }


Mime
View raw message