flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject flink git commit: [FLINK-6207] Duplicate TypeSerializers for async snapshots of CopyOnWriteStateTable
Date Tue, 28 Mar 2017 17:16:02 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 75db91e62 -> bb3e26f97


[FLINK-6207] Duplicate TypeSerializers for async snapshots of CopyOnWriteStateTable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb3e26f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb3e26f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb3e26f9

Branch: refs/heads/release-1.2
Commit: bb3e26f971f902b0080ac524398715ba4156d9b1
Parents: 75db91e
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Tue Mar 28 18:58:32 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Tue Mar 28 18:58:32 2017 +0200

----------------------------------------------------------------------
 .../async/CopyOnWriteStateTableSnapshot.java    |  32 +++-
 .../heap/async/CopyOnWriteStateTableTest.java   | 169 ++++++++++++++++++-
 2 files changed, 193 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb3e26f9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
index db3b197..c17b62e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
@@ -71,6 +71,21 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 	private int[] keyGroupOffsets;
 
 	/**
+	 * A local duplicate of the table's key serializer.
+	 */
+	private final TypeSerializer<K> localKeySerializer;
+
+	/**
+	 * A local duplicate of the table's namespace serializer.
+	 */
+	private final TypeSerializer<N> localNamespaceSerializer;
+
+	/**
+	 * A local duplicate of the table's state serializer.
+	 */
+	private final TypeSerializer<S> localStateSerializer;
+
+	/**
 	 * Creates a new {@link CopyOnWriteStateTableSnapshot}.
 	 *
 	 * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents
a snapshot.
@@ -81,6 +96,13 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 		this.snapshotData = owningStateTable.snapshotTableArrays();
 		this.snapshotVersion = owningStateTable.getStateTableVersion();
 		this.stateTableSize = owningStateTable.size();
+
+		// We create duplicates of the serializers for the async snapshot, because TypeSerializer
+		// might be stateful and shared with the event processing thread.
+		this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate();
+		this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
+		this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate();
+
 		this.keyGroupOffsets = null;
 	}
 
@@ -162,10 +184,6 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 		int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx];
 		int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1];
 
-		TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
-		TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer();
-		TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer();
-
 		// write number of mappings in key-group
 		dov.writeInt(endOffset - startOffset);
 
@@ -173,9 +191,9 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 		for (int i = startOffset; i < endOffset; ++i) {
 			CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i];
 			groupedOut[i] = null; // free asap for GC
-			namespaceSerializer.serialize(toWrite.namespace, dov);
-			keySerializer.serialize(toWrite.key, dov);
-			stateSerializer.serialize(toWrite.state, dov);
+			localNamespaceSerializer.serialize(toWrite.namespace, dov);
+			localKeySerializer.serialize(toWrite.key, dov);
+			localStateSerializer.serialize(toWrite.state, dov);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb3e26f9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
index fb36d67..7445c1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
@@ -23,13 +23,19 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -37,7 +43,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-public class CopyOnWriteStateTableTest {
+public class CopyOnWriteStateTableTest extends TestLogger {
 
 	/**
 	 * Testing the basic map operations.
@@ -380,6 +386,77 @@ public class CopyOnWriteStateTableTest {
 		Assert.assertTrue(originalState5 == stateTable.get(5, 1));
 	}
 
+	/**
+	 * This tests that serializers used for snapshots are duplicates of the ones used in
+	 * processing to avoid race conditions in stateful serializers.
+	 */
+	@Test
+	public void testSerializerDuplicationInSnapshot() throws IOException {
+
+		final TestDuplicateSerializer namespaceSerializer = new TestDuplicateSerializer();
+		final TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();;
+		final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();;
+
+		RegisteredBackendStateMetaInfo<Integer, Integer> metaInfo =
+			new RegisteredBackendStateMetaInfo<>(
+				StateDescriptor.Type.VALUE,
+				"test",
+				namespaceSerializer,
+				stateSerializer);
+
+		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
+		InternalKeyContext<Integer> mockKeyContext = new InternalKeyContext<Integer>()
{
+			@Override
+			public Integer getCurrentKey() {
+				return 0;
+			}
+
+			@Override
+			public int getCurrentKeyGroupIndex() {
+				return 0;
+			}
+
+			@Override
+			public int getNumberOfKeyGroups() {
+				return 1;
+			}
+
+			@Override
+			public KeyGroupRange getKeyGroupRange() {
+				return keyGroupRange;
+			}
+
+			@Override
+			public TypeSerializer<Integer> getKeySerializer() {
+				return keySerializer;
+			}
+		};
+
+		CopyOnWriteStateTable<Integer, Integer, Integer> table =
+			new CopyOnWriteStateTable<>(mockKeyContext, metaInfo);
+
+		table.put(0, 0, 0, 0);
+		table.put(1, 0, 0, 1);
+		table.put(2, 0, 1, 2);
+
+
+		CopyOnWriteStateTableSnapshot<Integer, Integer, Integer> snapshot = table.createSnapshot();
+
+		try {
+
+			namespaceSerializer.disable();
+			keySerializer.disable();
+			stateSerializer.disable();
+
+			snapshot.writeMappingsInKeyGroup(
+				new DataOutputViewStreamWrapper(
+					new ByteArrayOutputStreamWithPos(1024)), 0);
+
+		} finally {
+			table.releaseSnapshot(snapshot);
+		}
+	}
+
 	@SuppressWarnings("unchecked")
 	private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateTable.StateTableEntry<K,
N, S>[] snapshot, int mapSize) {
 
@@ -483,4 +560,94 @@ public class CopyOnWriteStateTableTest {
 			return serializer;
 		}
 	}
+
+	/**
+	 * Serializer that can be disabled. Duplicates are still enabled, so we can check that
+	 * serializers are duplicated.
+	 */
+	static class TestDuplicateSerializer extends TypeSerializer<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private static final Integer ZERO = 0;
+
+		private boolean disabled;
+
+		public TestDuplicateSerializer() {
+			this.disabled = false;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<Integer> duplicate() {
+			return new TestDuplicateSerializer();
+		}
+
+		@Override
+		public Integer createInstance() {
+			return ZERO;
+		}
+
+		@Override
+		public Integer copy(Integer from) {
+			return from;
+		}
+
+		@Override
+		public Integer copy(Integer from, Integer reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return 4;
+		}
+
+		@Override
+		public void serialize(Integer record, DataOutputView target) throws IOException {
+			Assert.assertFalse(disabled);
+			target.writeInt(record);
+		}
+
+		@Override
+		public Integer deserialize(DataInputView source) throws IOException {
+			Assert.assertFalse(disabled);
+			return source.readInt();
+		}
+
+		@Override
+		public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+			Assert.assertFalse(disabled);
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			Assert.assertFalse(disabled);
+			target.writeInt(source.readInt());
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof TestDuplicateSerializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof TestDuplicateSerializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		public void disable() {
+			this.disabled = true;
+		}
+	}
 }


Mime
View raw message