flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject flink git commit: [FLINK-6007] Allow key removal from within the watermark callback.
Date Mon, 13 Mar 2017 19:07:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5f08e5359 -> 14c1941d8


[FLINK-6007] Allow key removal from within the watermark callback.

When deleting a key from the InternalWatermarkCallbackService, the
deleted key is put into a separate set, and the actual deletion
happens after the iteration over all keys has finished. To avoid
checkpointing the deletion set, the actual cleanup also happens
upon checkpointing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14c1941d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14c1941d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14c1941d

Branch: refs/heads/master
Commit: 14c1941d8eaa583eb8f7eeb5478e605850c0d355
Parents: 5f08e53
Author: kl0u <kkloudas@gmail.com>
Authored: Wed Mar 8 20:18:18 2017 +0100
Committer: kl0u <kkloudas@gmail.com>
Committed: Mon Mar 13 17:29:03 2017 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   6 +
 .../operators/InternalTimeServiceManager.java   |   5 +
 .../InternalWatermarkCallbackService.java       | 112 +++++++++++++-----
 .../operators/AbstractStreamOperatorTest.java   | 117 +++++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java |   9 ++
 5 files changed, 222 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6e6b147..ef23be9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -957,4 +957,10 @@ public abstract class AbstractStreamOperator<OUT>
 		return timeServiceManager == null ? 0 :
 			timeServiceManager.numEventTimeTimers();
 	}
+
+	@VisibleForTesting
+	public int numKeysForWatermarkCallback() {
+		return timeServiceManager == null ? 0 :
+			timeServiceManager.numKeysForWatermarkCallback();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 71ffbd2..0b60232 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -188,4 +188,9 @@ class InternalTimeServiceManager<K, N> {
 		}
 		return count;
 	}
+
+	@VisibleForTesting
+	public int numKeysForWatermarkCallback() {
+		return watermarkCallbackService.numKeysForWatermarkCallback();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
index a4263e4..9a43853 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -38,8 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * The watermark callback service allows to register a {@link OnWatermarkCallback OnWatermarkCallback}
  * and multiple keys, for which the callback will be invoked every time a new {@link Watermark}
is received
  * (after the registration of the key).
- * <p>
- * <b>NOTE: </b> This service is only available to <b>keyed</b> operators.
+ *
+ * <p><b>NOTE: </b> This service is only available to <b>keyed</b>
operators.
  *
  *  @param <K> The type of key returned by the {@code KeySelector}.
  */
@@ -58,7 +59,17 @@ public class InternalWatermarkCallbackService<K> {
 	 * An array of sets of keys keeping the registered keys split
 	 * by the key-group they belong to. Each key-group has one set.
 	 */
-	private final Set<K>[] keysByKeygroup;
+	private final Set<K>[] registeredKeysByKeyGroup;
+
+	/**
+	 * An array of sets of keys keeping the keys "to delete" split
+	 * by the key-group they belong to. Each key-group has one set.
+	 *
+	 * <p>This is used to avoid potential concurrent modification
+	 * exception when deleting keys from inside the
+	 * {@link #invokeOnWatermarkCallback(Watermark)}.
+	 */
+	private final Set<K>[] deletedKeysByKeyGroup;
 
 	/** A serializer for the registered keys. */
 	private TypeSerializer<K> keySerializer;
@@ -84,7 +95,8 @@ public class InternalWatermarkCallbackService<K> {
 
 		// the list of ids of the key-groups this task is responsible for
 		int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups();
-		this.keysByKeygroup = new Set[localKeyGroups];
+		this.registeredKeysByKeyGroup = new Set[localKeyGroups];
+		this.deletedKeysByKeyGroup = new Set[localKeyGroups];
 	}
 
 	/**
@@ -110,7 +122,7 @@ public class InternalWatermarkCallbackService<K> {
 	 * @param key The key to be registered.
 	 */
 	public boolean registerKeyForWatermarkCallback(K key) {
-		return getKeySetForKeyGroup(key).add(key);
+		return getRegisteredKeysForKeyGroup(key).add(key);
 	}
 
 	/**
@@ -119,13 +131,7 @@ public class InternalWatermarkCallbackService<K> {
 	 * @param key The key to be unregistered.
 	 */
 	public boolean unregisterKeyFromWatermarkCallback(K key) {
-		Set<K> keys = getKeySetForKeyGroup(key);
-		boolean res = keys.remove(key);
-
-		if (keys.isEmpty()) {
-			removeKeySetForKey(key);
-		}
-		return res;
+		return getDeletedKeysForKeyGroup(key).add(key);
 	}
 
 	/**
@@ -134,8 +140,11 @@ public class InternalWatermarkCallbackService<K> {
 	 * @param watermark The watermark that triggered the invocation.
 	 */
 	public void invokeOnWatermarkCallback(Watermark watermark) throws IOException {
+		// clean up any keys registered for deletion before calling the callback
+		cleanupRegisteredKeys();
+
 		if (callback != null) {
-			for (Set<K> keySet : keysByKeygroup) {
+			for (Set<K> keySet : registeredKeysByKeyGroup) {
 				if (keySet != null) {
 					for (K key : keySet) {
 						keyContext.setCurrentKey(key);
@@ -147,15 +156,38 @@ public class InternalWatermarkCallbackService<K> {
 	}
 
 	/**
+	 * Does the actual deletion of any keys registered for deletion using the
+	 * {@link #unregisterKeyFromWatermarkCallback(Object)}.
+	 */
+	private void cleanupRegisteredKeys() {
+		for (int keyGroupIdx = 0; keyGroupIdx < registeredKeysByKeyGroup.length; keyGroupIdx++)
{
+
+			Set<K> deletedKeys = deletedKeysByKeyGroup[keyGroupIdx];
+			if (deletedKeys != null) {
+
+				Set<K> registeredKeys = registeredKeysByKeyGroup[keyGroupIdx];
+				if (registeredKeys != null) {
+
+					registeredKeys.removeAll(deletedKeys);
+					if (registeredKeys.isEmpty()) {
+						registeredKeysByKeyGroup[keyGroupIdx] = null;
+					}
+				}
+				deletedKeysByKeyGroup[keyGroupIdx] = null;
+			}
+		}
+	}
+
+	/**
 	 * Retrieve the set of keys for the key-group this key belongs to.
 	 *
 	 * @param key the key whose key-group we are searching.
 	 * @return the set of registered keys for the key-group.
 	 */
-	private Set<K> getKeySetForKeyGroup(K key) {
+	private Set<K> getRegisteredKeysForKeyGroup(K key) {
 		checkArgument(localKeyGroupRange != null, "The operator has not been initialized.");
 		int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups);
-		return getKeySetForKeyGroup(keyGroupIdx);
+		return getRegisteredKeysForKeyGroup(keyGroupIdx);
 	}
 
 	/**
@@ -164,27 +196,36 @@ public class InternalWatermarkCallbackService<K> {
 	 * @param keyGroupIdx the index of the key group we are interested in.
 	 * @return the set of keys for the key-group.
 	 */
-	private Set<K> getKeySetForKeyGroup(int keyGroupIdx) {
+	private Set<K> getRegisteredKeysForKeyGroup(int keyGroupIdx) {
 		int localIdx = getIndexForKeyGroup(keyGroupIdx);
-		Set<K> keys = keysByKeygroup[localIdx];
+		Set<K> keys = registeredKeysByKeyGroup[localIdx];
 		if (keys == null) {
 			keys = new HashSet<>();
-			keysByKeygroup[localIdx] = keys;
+			registeredKeysByKeyGroup[localIdx] = keys;
 		}
 		return keys;
 	}
 
-	private void removeKeySetForKey(K key) {
+	private Set<K> getDeletedKeysForKeyGroup(K key) {
 		checkArgument(localKeyGroupRange != null, "The operator has not been initialized.");
 		int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups);
-		int localKeyGroupIdx = getIndexForKeyGroup(keyGroupIdx);
-		keysByKeygroup[localKeyGroupIdx] = null;
+		return getDeletedKeysForKeyGroup(keyGroupIdx);
+	}
+
+	private Set<K> getDeletedKeysForKeyGroup(int keyGroupIdx) {
+		int localIdx = getIndexForKeyGroup(keyGroupIdx);
+		Set<K> keys = deletedKeysByKeyGroup[localIdx];
+		if (keys == null) {
+			keys = new HashSet<>();
+			deletedKeysByKeyGroup[localIdx] = keys;
+		}
+		return keys;
 	}
 
 	/**
 	 * Computes the index of the requested key-group in the local datastructures.
-	 * <li/>
-	 * Currently we assume that each task is assigned a continuous range of key-groups,
+	 *
+	 * <p>Currently we assume that each task is assigned a continuous range of key-groups,
 	 * e.g. 1,2,3,4, and not 1,3,5. We leverage this to keep the different states
 	 * key-grouped in arrays instead of maps, where the offset for each key-group is
 	 * the key-group id (an int) minus the id of the first key-group in the local range.
@@ -199,7 +240,11 @@ public class InternalWatermarkCallbackService<K> {
 	//////////////////				Fault Tolerance Methods				///////////////////
 
 	public void snapshotKeysForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx)
throws Exception {
-		Set<K> keySet = getKeySetForKeyGroup(keyGroupIdx);
+
+		// we cleanup also here to avoid checkpointing the deletion set
+		cleanupRegisteredKeys();
+
+		Set<K> keySet = getRegisteredKeysForKeyGroup(keyGroupIdx);
 		if (keySet != null) {
 			stream.writeInt(keySet.size());
 
@@ -224,16 +269,29 @@ public class InternalWatermarkCallbackService<K> {
 			TypeSerializer<K> tmpKeyDeserializer = InstantiationUtil.deserializeObject(stream,
userCodeClassLoader);
 
 			if (keySerializer != null && !keySerializer.equals(tmpKeyDeserializer)) {
-				throw new IllegalArgumentException("Tried to restore timers " +
-					"for the same service with different serializers.");
+				throw new IllegalArgumentException("Tried to restore keys " +
+					"for the watermark callback service with mismatching serializers.");
 			}
 
 			this.keySerializer = tmpKeyDeserializer;
 
-			Set<K> keys = getKeySetForKeyGroup(keyGroupIdx);
+			Set<K> keys = getRegisteredKeysForKeyGroup(keyGroupIdx);
 			for (int i = 0; i < numKeys; i++) {
 				keys.add(keySerializer.deserialize(stream));
 			}
 		}
 	}
+
+	//////////////////				Testing Methods				///////////////////
+
+	@VisibleForTesting
+	public int numKeysForWatermarkCallback() {
+		int count = 0;
+		for (Set<K> keyGroup: registeredKeysByKeyGroup) {
+			if (keyGroup != null) {
+				count += keyGroup.size();
+			}
+		}
+		return count;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 33def9e..eeee8dc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -788,6 +788,78 @@ public class AbstractStreamOperatorTest {
 		testHarness5.close();
 	}
 
+	@Test
+	public void testWatermarkCallbackServiceKeyDeletion() throws Exception {
+		final int MAX_PARALLELISM = 10;
+
+		Tuple2<Integer, String> element1 = new Tuple2<>(7, "start");
+		Tuple2<Integer, String> element2 = new Tuple2<>(45, "start");
+		Tuple2<Integer, String> element3 = new Tuple2<>(90, "start");
+
+		TestOperatorWithDeletingKeyCallback op = new TestOperatorWithDeletingKeyCallback(45);
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer>
testHarness1 =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				op,
+				new TestKeySelector(),
+				BasicTypeInfo.INT_TYPE_INFO,
+				MAX_PARALLELISM,
+				1,
+				0);
+		testHarness1.open();
+
+		testHarness1.processElement(new StreamRecord<>(element1));
+		testHarness1.processElement(new StreamRecord<>(element2));
+
+		testHarness1.processWatermark(10L);
+
+		assertEquals(3L, testHarness1.getOutput().size());
+		verifyElement(testHarness1.getOutput().poll(), 7);
+		verifyElement(testHarness1.getOutput().poll(), 45);
+		verifyWatermark(testHarness1.getOutput().poll(), 10);
+
+		testHarness1.processElement(new StreamRecord<>(element3));
+		testHarness1.processWatermark(20L);
+
+		// because at the first watermark the operator removed key 45
+		assertEquals(3L, testHarness1.getOutput().size());
+		verifyElement(testHarness1.getOutput().poll(), 7);
+		verifyElement(testHarness1.getOutput().poll(), 90);
+		verifyWatermark(testHarness1.getOutput().poll(), 20);
+
+		testHarness1.processWatermark(25L);
+
+		verifyElement(testHarness1.getOutput().poll(), 7);
+		verifyElement(testHarness1.getOutput().poll(), 90);
+		verifyWatermark(testHarness1.getOutput().poll(), 25);
+
+		// unregister key and then fail
+		op.unregisterKey(90);
+
+		// take a snapshot with some elements in internal sorting queue
+		OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
+		testHarness1.close();
+
+		testHarness1 = new KeyedOneInputStreamOperatorTestHarness<>(
+				new TestOperatorWithDeletingKeyCallback(45),
+				new TestKeySelector(),
+				BasicTypeInfo.INT_TYPE_INFO,
+				MAX_PARALLELISM,
+				1,
+				0);
+		testHarness1.setup();
+		testHarness1.initializeState(snapshot);
+		testHarness1.open();
+
+		testHarness1.processWatermark(30L);
+
+		assertEquals(2L, testHarness1.getOutput().size());
+		verifyElement(testHarness1.getOutput().poll(), 7);
+		verifyWatermark(testHarness1.getOutput().poll(), 30);
+
+		testHarness1.close();
+	}
+
 	private KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>,
Integer> getTestHarness(
 			int maxParallelism, int noOfTasks, int taskIdx) throws Exception {
 
@@ -868,6 +940,51 @@ public class AbstractStreamOperatorTest {
 		}
 	}
 
+	private static class TestOperatorWithDeletingKeyCallback
+			extends AbstractStreamOperator<Integer>
+			implements OneInputStreamOperator<Tuple2<Integer, String>, Integer> {
+
+		private static final long serialVersionUID = 9215057823264582305L;
+
+		private final int keyToDelete;
+
+		public TestOperatorWithDeletingKeyCallback(int keyToDelete) {
+			this.keyToDelete = keyToDelete;
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+
+			InternalWatermarkCallbackService<Integer> callbackService = getInternalWatermarkCallbackService();
+
+			callbackService.setWatermarkCallback(new OnWatermarkCallback<Integer>() {
+
+				@Override
+				public void onWatermark(Integer integer, Watermark watermark) throws IOException {
+
+					// this is to simulate the case where we may have a concurrent modification
+					// exception as we iterate over the list of registered keys and we concurrently
+					// delete the key.
+
+					if (integer.equals(keyToDelete)) {
+						getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(integer);
+					}
+					output.collect(new StreamRecord<>(integer));
+				}
+			}, IntSerializer.INSTANCE);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws
Exception {
+			getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(element.getValue().f0);
+		}
+
+		public void unregisterKey(int key) {
+			getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(key);
+		}
+	}
+
 	/**
 	 * Testing operator that can respond to commands by either setting/deleting state, emitting
 	 * state or setting timers.

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index c984eed..f0a4c42 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -597,6 +597,15 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		}
 	}
 
+	@VisibleForTesting
+	public int numKeysForWatermarkCallback() {
+		if (operator instanceof AbstractStreamOperator) {
+			return ((AbstractStreamOperator) operator).numKeysForWatermarkCallback();
+		} else {
+			throw new UnsupportedOperationException();
+		}
+	}
+
 	private class MockOutput implements Output<StreamRecord<OUT>> {
 
 		private TypeSerializer<OUT> outputSerializer;


Mime
View raw message