flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/8] flink git commit: [FLINK-5692] [core] (followups) Add an Option to Deactivate Kryo Fallback for Serializers
Date Thu, 16 Mar 2017 14:05:43 GMT
[FLINK-5692] [core] (followups) Add an Option to Deactivate Kryo Fallback for Serializers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d498cbed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d498cbed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d498cbed

Branch: refs/heads/master
Commit: d498cbedfda7c5ebabbc5f8203d7518926ede423
Parents: 0f99aae
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Mar 15 15:18:49 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 | 10 ++++--
 .../flink/api/common/ExecutionConfig.java       | 35 ++++++++++++++++----
 .../api/java/typeutils/GenericTypeInfo.java     |  2 +-
 .../flink/api/common/ExecutionConfigTest.java   | 17 +++++-----
 4 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 20ee071..053a839 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,12 +306,18 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<?
extends Serializ
 
 There are different variants of these methods available.
 
-If you do not want to fall back to Kryo and further make sure that you have provided your
own custom serializers for all POJOs explicitly, set
+
+## Disabling Kryo Fallback
+
+There are cases when programs may want to explicitly avoid using Kryo as a fallback for generic
types. The most
+common one is wanting to ensure that all types are efficiently serialized either through
Flink's own serializers,
+or via user-defined custom serializers.
+
+The setting below will raise an exception whenever a data type is encountered that would
go through Kryo:
 {% highlight java %}
 env.getConfig().disableGenericTypes();
 {% endhighlight %}
 
-If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink
tries to fall back to the default Kryo serializer logic in the runtime.
 
 ## Defining Type Information using a Factory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3bd91c7..26e6af1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
 	private boolean forceKryo = false;
 
+	/** Flag to indicate whether generic types (through Kryo) are supported */
 	private boolean disableGenericTypes = false;
 
 	private boolean objectReuse = false;
@@ -521,26 +522,46 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	}
 
 	/**
-	 * Enable generic types.
+	 * Enables the use generic types which are serialized via Kryo.
+	 * 
+	 * <p>Generic types are enabled by default.
 	 *
-	 * @see ExecutionConfig#disableGenericTypes()
+	 * @see #disableGenericTypes()
 	 */
 	public void enableGenericTypes() {
 		disableGenericTypes = false;
 	}
 
 	/**
-	 * Disable generic types to make sure that you have provided your own custom serializers
for
-	 * all POJOs explicitly.
+	 * Disables the use of generic types (types that would be serialized via Kryo). If this
option
+	 * is used, Flink will throw an {@code UnsupportedOperationException} whenever it encounters
+	 * a data type that would go through Kryo for serialization.
 	 *
-	 * If generic types disabled,
-	 * an {@link UnsupportedOperationException} will be thrown when Flink
-	 * tries to fall back to the default Kryo serializer logic in the runtime.
+	 * <p>Disabling generic types can be helpful to eagerly find and eliminate teh use
of types
+	 * that would go through Kryo serialization during runtime. Rather than checking types
+	 * individually, using this option will throw exceptions eagerly in the places where generic
+	 * types are used.
+	 * 
+	 * <p><b>Important:</b> We recommend to use this option only during development
and pre-production
+	 * phases, not during actual production use. The application program and/or the input data
may be
+	 * such that new, previously unseen, types occur at some point. In that case, setting this
option
+	 * would cause the program to fail.
+	 * 
+	 * @see #enableGenericTypes()
 	 */
 	public void disableGenericTypes() {
 		disableGenericTypes = true;
 	}
 
+	/**
+	 * Checks whether generic types are supported. Generic types are types that go through Kryo
during
+	 * serialization.
+	 * 
+	 * <p>Generic types are enabled by default.
+	 * 
+	 * @see #enableGenericTypes()
+	 * @see #disableGenericTypes()
+	 */
 	public boolean hasGenericTypesDisabled() {
 		return disableGenericTypes;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index a4cea31..136b7d7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -83,7 +83,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements
AtomicType
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
 		if (config.hasGenericTypesDisabled()) {
 			throw new UnsupportedOperationException(
-				"Generic types are disabled for POJOs serialization, but type " + this.typeClass +
+				"Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName()
+
 				" is treated as a generic type.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 4956a9a..d000ff9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -70,23 +70,22 @@ public class ExecutionConfigTest {
 	}
 
 	@Test
-	public void testForceCustomSerializerCheck() {
+	public void testDisableGenericTypes() {
 		ExecutionConfig conf = new ExecutionConfig();
 		TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class);
+
+		// by default, generic types are supported
 		TypeSerializer<Object> serializer = typeInfo.createSerializer(conf);
 		assertTrue(serializer instanceof KryoSerializer);
 
+		// expect an exception when generic types are disabled
 		conf.disableGenericTypes();
-		boolean createSerializerFailed = false;
 		try {
 			typeInfo.createSerializer(conf);
-		} catch (UnsupportedOperationException e) {
-			createSerializerFailed = true;
-		} catch (Throwable t) {
-			fail("Unexpected exception thrown: " + t.getMessage());
+			fail("should have failed with an exception");
+		}
+		catch (UnsupportedOperationException e) {
+			// expected
 		}
-
-		assertTrue(createSerializerFailed);
 	}
-
 }


Mime
View raw message