flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [12/15] flink git commit: [FLINK-6801] [core] Relax missing fields check when reading PojoSerializerConfigSnapshot
Date Tue, 13 Jun 2017 05:49:09 GMT
[FLINK-6801] [core] Relax missing fields check when reading PojoSerializerConfigSnapshot

Prior to this commit, when reading the PojoSerializerConfigSnapshot, if
the underlying POJO type has a missing field, then the read would fail.
Failing the deserialization of the config snapshot is too severe,
because that would leave no oppurtunity to restore the checkpoint at
all, whereas we should be able to restore the config and provide it to
the new PojoSerializer for the change of getting a convert deserializer.

This commit changes this by only restoring the field names when reading
the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility,
the field name is used to lookup the fields of the new PojoSerializer.
This change does not change the serialization format of the
PojoSerializerConfigSnapshot.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f239d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f239d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f239d7

Branch: refs/heads/release-1.3
Commit: b8f239d723aa4dd145835f6f8de33b89b4bded92
Parents: deaf4bf
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Sun Jun 4 12:30:58 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Jun 13 07:48:32 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoSerializer.java  | 65 +++++++-------------
 .../typeutils/runtime/PojoSerializerTest.java   | 18 +++---
 2 files changed, 32 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f239d7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 2311158..7818897 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -600,18 +600,18 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 						(TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
 
 					int i = 0;
-					for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToConfigSnapshotEntry
+					for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToConfigSnapshotEntry
 							: config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 						int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
 						if (fieldIndex != -1) {
-							reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
+							reorderedFields[i] = fields[fieldIndex];
 
 							compatResult = CompatibilityUtil.resolveCompatibilityResult(
-									fieldToConfigSnapshotEntry.getValue().f0,
-									UnloadableDummyTypeSerializer.class,
-									fieldToConfigSnapshotEntry.getValue().f1,
-									fieldSerializers[fieldIndex]);
+								fieldToConfigSnapshotEntry.getValue().f0,
+								UnloadableDummyTypeSerializer.class,
+								fieldToConfigSnapshotEntry.getValue().f1,
+								fieldSerializers[fieldIndex]);
 
 							if (compatResult.isRequiresMigration()) {
 								requiresMigration = true;
@@ -745,7 +745,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 		 * may reorder the fields in case they are different. The order of the fields need to
 		 * stay the same for binary compatibility, as the field order is part of the serialization
format.
 		 */
-		private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshot;
+		private LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshot;
 
 		/**
 		 * Ordered map of registered subclasses to their corresponding serializers and its configuration
snapshots.
@@ -771,7 +771,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshot,
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshot,
 				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
registeredSubclassesToSerializerConfigSnapshots,
 				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
nonRegisteredSubclassesToSerializerConfigSnapshots) {
 
@@ -785,7 +785,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshot,
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshot,
 				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
registeredSubclassesToSerializerConfigSnapshots,
 				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
nonRegisteredSubclassesToSerializerConfigSnapshots,
 				boolean ignoreTypeSerializerSerialization) {
@@ -813,10 +813,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 				// --- write fields and their serializers, in order
 
 				out.writeInt(fieldToSerializerConfigSnapshot.size());
-				for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
entry
+				for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
entry
 						: fieldToSerializerConfigSnapshot.entrySet()) {
 
-					outViewWrapper.writeUTF(entry.getKey().getName());
+					outViewWrapper.writeUTF(entry.getKey());
 
 					out.writeInt(outWithPos.getPosition());
 					if (!ignoreTypeSerializerSerialization) {
@@ -904,39 +904,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 
 				this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
 				String fieldName;
-				Field field;
 				TypeSerializer<?> fieldSerializer;
 				TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot;
 				for (int i = 0; i < numFields; i++) {
 					fieldName = inViewWrapper.readUTF();
 
-					// search all superclasses for the field
-					Class<?> clazz = getTypeClass();
-					field = null;
-					while (clazz != null) {
-						try {
-							field = clazz.getDeclaredField(fieldName);
-							field.setAccessible(true);
-							break;
-						} catch (NoSuchFieldException e) {
-							clazz = clazz.getSuperclass();
-						}
-					}
+					inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+					fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
 
-					if (field == null) {
-						// the field no longer exists in the POJO
-						throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
-					} else {
-						inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
-						fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper,
getUserCodeClassLoader());
-
-						inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
-						fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper,
getUserCodeClassLoader());
+					inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+					fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper,
getUserCodeClassLoader());
 
-						fieldToSerializerConfigSnapshot.put(
-							field,
-							new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer,
fieldSerializerConfigSnapshot));
-					}
+					fieldToSerializerConfigSnapshot.put(
+						fieldName,
+						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer,
fieldSerializerConfigSnapshot));
 				}
 
 				// --- read registered subclasses and their serializers, in registration order
@@ -998,7 +979,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 			return VERSION;
 		}
 
-		public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
getFieldToSerializerConfigSnapshot() {
+		public LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
getFieldToSerializerConfigSnapshot() {
 			return fieldToSerializerConfigSnapshot;
 		}
 
@@ -1144,10 +1125,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 	 * Finds and returns the order (0-based) of a POJO field.
 	 * Returns -1 if the field does not exist for this POJO.
 	 */
-	private int findField(Field f) {
+	private int findField(String fieldName) {
 		int foundIndex = 0;
 		for (Field field : fields) {
-			if (f.equals(field)) {
+			if (fieldName.equals(field.getName())) {
 				return foundIndex;
 			}
 
@@ -1174,12 +1155,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 			TypeSerializer<?>[] fieldSerializers,
 			HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache)
{
 
-		final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshots =
+		final LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
fieldToSerializerConfigSnapshots =
 			new LinkedHashMap<>(fields.length);
 
 		for (int i = 0; i < fields.length; i++) {
 			fieldToSerializerConfigSnapshots.put(
-				fields[i],
+				fields[i].getName(),
 				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i],
fieldSerializers[i].snapshotConfiguration()));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f239d7/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 10f4708..e5315aa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -484,35 +484,35 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		// creating this serializer just for generating config snapshots of the field serializers
 		PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new
ExecutionConfig());
 
-		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
mockOriginalFieldToSerializerConfigSnapshot =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
mockOriginalFieldToSerializerConfigSnapshot =
 			new LinkedHashMap<>(mockOriginalFieldOrder.length);
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[0],
+			mockOriginalFieldOrder[0].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[3],
 				ser.getFieldSerializers()[3].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[1],
+			mockOriginalFieldOrder[1].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[2],
 				ser.getFieldSerializers()[2].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[2],
+			mockOriginalFieldOrder[2].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[5],
 				ser.getFieldSerializers()[5].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[3],
+			mockOriginalFieldOrder[3].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[0],
 				ser.getFieldSerializers()[0].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[4],
+			mockOriginalFieldOrder[4].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[1],
 				ser.getFieldSerializers()[1].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[5],
+			mockOriginalFieldOrder[5].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[4],
 				ser.getFieldSerializers()[4].snapshotConfiguration()));
@@ -579,9 +579,9 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 			PojoSerializer.PojoSerializerConfigSnapshot<?> original,
 			PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) {
 
-		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
originalFieldSerializersAndConfs =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
originalFieldSerializersAndConfs =
 				original.getFieldToSerializerConfigSnapshot();
-		for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
entry
+		for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
entry
 				: deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 			Assert.assertEquals(null, entry.getValue().f0);


Mime
View raw message