flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] incubator-flink git commit: [FLINK-1286] [APIs] [runtime] Fix serialization in CollectionInputFormat and generate meaningful error messages
Date Wed, 26 Nov 2014 17:51:25 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 82f5154a9 -> 112b3a937


[FLINK-1286] [APIs] [runtime] Fix serialization in CollectionInputFormat and generate meaningful
error messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d8589303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d8589303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d8589303

Branch: refs/heads/master
Commit: d85893036b9a3122900010ce975feba43b27531d
Parents: 82f5154
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Nov 26 17:06:51 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Nov 26 18:14:58 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java | 24 +++++++++-
 .../operators/util/UserCodeClassWrapper.java    |  5 ++
 .../operators/util/UserCodeObjectWrapper.java   |  5 ++
 .../common/operators/util/UserCodeWrapper.java  | 24 ++++++----
 .../api/java/io/CollectionInputFormat.java      | 28 ++++++++----
 .../api/java/io/CollectionInputFormatTest.java  |  5 +-
 .../runtime/jobgraph/InputFormatVertex.java     | 45 ++++++++++++++----
 .../runtime/jobgraph/OutputFormatVertex.java    | 48 ++++++++++++++------
 8 files changed, 137 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index eb6fe2e..bb537b7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.dag.TempMode;
@@ -835,6 +836,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSourceTask.class);
+		vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
 
 		// set user code
 		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
@@ -850,7 +852,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 
 		vertex.setInvokableClass(DataSinkTask.class);
 		vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
-
+		vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
+		
 		// set user code
 		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
 		config.setStubParameters(node.getPactContract().getParameters());
@@ -1426,6 +1429,25 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new
LongSumAggregator());
 		syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new
WorksetEmptyConvergenceCriterion());
 	}
+	
+	private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
+		try {
+			if (wrapper.hasObject()) {
+				try {
+					return wrapper.getUserCodeObject().toString();
+				}
+				catch (Throwable t) {
+					return wrapper.getUserCodeClass().getName();
+				}
+			}
+			else {
+				return wrapper.getUserCodeClass().getName();
+			}
+		}
+		catch (Throwable t) {
+			return null;
+		}
+	}
 
 	// -------------------------------------------------------------------------------------
 	// Descriptors for tasks / configurations that are chained or merged with other tasks

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
index 61fc382..1b1d5ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
@@ -53,4 +53,9 @@ public class UserCodeClassWrapper<T> implements UserCodeWrapper<T>
{
 	public Class<? extends T> getUserCodeClass() {
 		return userCodeClass;
 	}
+	
+	@Override
+	public boolean hasObject() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
index 95a425e..f06de90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
@@ -121,4 +121,9 @@ public class UserCodeObjectWrapper<T> implements UserCodeWrapper<T>
{
 	public Class<? extends T> getUserCodeClass() {
 		return (Class<? extends T>) userCodeObject.getClass();
 	}
+	
+	@Override
+	public boolean hasObject() {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
index 3d13041..032c906 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
@@ -22,29 +22,26 @@ import java.io.Serializable;
 import java.lang.annotation.Annotation;
 
 /**
- * PACT contracts can have either a class or an object containing the user
+ * UDf operators can have either a class or an object containing the user
  * code, this is the common interface to access them.
  */
 public interface UserCodeWrapper<T> extends Serializable {
+	
 	/**
-	 * Gets the user code object. In the case of a pact, that object will be the stub with the
user function,
-	 * in the case of an input or output format, it will be the format object.
-	 * 
+	 * Gets the user code object, which may be either a function or an input or output format.
 	 * The subclass is supposed to just return the user code object or instantiate the class.
 	 * 
 	 * @return The class with the user code.
 	 */
-	public T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
+	T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
 	
 	/**
 	 * Gets the user code object. In the case of a pact, that object will be the stub with the
user function,
 	 * in the case of an input or output format, it will be the format object.
 	 * 
-	 * The subclass is supposed to just return the user code object or instantiate the class.
-	 * 
 	 * @return The class with the user code.
 	 */
-	public T getUserCodeObject();
+	T getUserCodeObject();
 	
 	/**
 	 * Gets an annotation that pertains to the user code class. By default, this method will
look for
@@ -55,7 +52,7 @@ public interface UserCodeWrapper<T> extends Serializable {
 	 *        the Class object corresponding to the annotation type
 	 * @return the annotation, or null if no annotation of the requested type was found
 	 */
-	public <A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
+	<A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
 	
 	/**
 	 * Gets the class of the user code. If the user code is provided as a class, this class
is just returned.
@@ -63,5 +60,12 @@ public interface UserCodeWrapper<T> extends Serializable {
 	 * 
 	 * @return The class of the user code object.
 	 */
-	public Class<? extends T> getUserCodeClass ();
+	Class<? extends T> getUserCodeClass ();
+	
+	/**
+	 * Checks whether the wrapper already has an object, or whether it needs to instantiate
it.
+	 * 
+	 * @return True, if the wrapper has already an object, false if it has only a class.
+	 */
+	boolean hasObject();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index b999ede..97e8715 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -78,11 +78,15 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T>
implements N
 
 	private void writeObject(ObjectOutputStream out) throws IOException {
 		out.defaultWriteObject();
-		out.writeInt(dataSet.size());
 		
-		OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
-		for (T element : dataSet){
-			serializer.serialize(element, wrapper);
+		final int size = dataSet.size();
+		out.writeInt(size);
+		
+		if (size > 0) {
+			OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
+			for (T element : dataSet){
+				serializer.serialize(element, wrapper);
+			}
 		}
 	}
 
@@ -92,11 +96,17 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T>
implements N
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
 		
-		InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
-		for (int i = 0; i < collectionLength; i++){
-			T element = serializer.createInstance();
-			element = serializer.deserialize(element, wrapper);
-			list.add(element);
+		if (collectionLength > 0) {
+			try {
+				InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
+				for (int i = 0; i < collectionLength; i++){
+					T element = serializer.deserialize(wrapper);
+					list.add(element);
+				}
+			}
+			catch (Throwable t) {
+				throw new IOException("Error while deserializing element from collection", t);
+			}
 		}
 
 		dataSet = list;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 948d22f..64dae22 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -246,11 +246,8 @@ public class CollectionInputFormatTest {
 				in.readObject();
 				fail("should throw an exception");
 			}
-			catch (TestException e) {
-				// expected
-			}
 			catch (Exception e) {
-				fail("Exception not properly forwarded");
+				assertTrue(e.getCause() instanceof TestException);
 			}
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 8ee4da4..0ea0da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -26,8 +26,7 @@ public class InputFormatVertex extends AbstractJobVertex {
 
 	private static final long serialVersionUID = 1L;
 	
-	/** Caches the output format associated to this output vertex. */
-	private transient InputFormat<?, ?> inputFormat;
+	private String formatDescription;
 	
 	
 	public InputFormatVertex(String name) {
@@ -39,19 +38,47 @@ public class InputFormatVertex extends AbstractJobVertex {
 	}
 	
 	
+	public void setFormatDescription(String formatDescription) {
+		this.formatDescription = formatDescription;
+	}
+	
+	public String getFormatDescription() {
+		return formatDescription;
+	}
+	
 	@Override
 	public void initializeOnMaster(ClassLoader loader) throws Exception {
-		if (inputFormat == null) {
-			TaskConfig cfg = new TaskConfig(getConfiguration());
-			UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.getStubWrapper(loader);
-			
-			if (wrapper == null) {
-				throw new Exception("No input format present in InputFormatVertex's task configuration.");
-			}
+		final TaskConfig cfg = new TaskConfig(getConfiguration());
+		
+		// deserialize from the payload
+		UserCodeWrapper<InputFormat<?, ?>> wrapper;
+		try {
 			
+			wrapper = cfg.getStubWrapper(loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Deserializing the InputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
+		}
+		if (wrapper == null) {
+			throw new Exception("No input format present in InputFormatVertex's task configuration.");
+		}
+		
+		// instantiate, if necessary
+		InputFormat<?, ?> inputFormat;
+		try {
 			inputFormat = wrapper.getUserCodeObject(InputFormat.class, loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Instantiating the InputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
+		}
+		
+		// configure
+		try {
 			inputFormat.configure(cfg.getStubParameters());
 		}
+		catch (Throwable t) {
+			throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: "
+ t.getMessage(), t);
+		}
 		
 		setInputSplitSource(inputFormat);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index 365ed92..708b390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -31,10 +31,8 @@ public class OutputFormatVertex extends AbstractJobVertex {
 	
 	private static final long serialVersionUID = 1L;
 	
+	private String formatDescription;
 	
-	/** Caches the output format associated to this output vertex. */
-	private transient OutputFormat<?> outputFormat;
-
 	/**
 	 * Creates a new task vertex with the specified name.
 	 * 
@@ -44,23 +42,45 @@ public class OutputFormatVertex extends AbstractJobVertex {
 		super(name);
 	}
 	
+	public void setFormatDescription(String formatDescription) {
+		this.formatDescription = formatDescription;
+	}
+	
+	public String getFormatDescription() {
+		return formatDescription;
+	}
 	
 	@Override
 	public void initializeOnMaster(ClassLoader loader) throws Exception {
-		if (this.outputFormat == null) {
-			TaskConfig cfg = new TaskConfig(getConfiguration());
-			UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+		final TaskConfig cfg = new TaskConfig(getConfiguration());
 		
-			if (wrapper == null) {
-				throw new Exception("No output format present in OutputFormatVertex's task configuration.");
-			}
-
-			this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
-			this.outputFormat.configure(cfg.getStubParameters());
+		UserCodeWrapper<OutputFormat<?>> wrapper;
+		try {
+			wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Deserializing the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
+		}
+		if (wrapper == null) {
+			throw new Exception("No input format present in InputFormatVertex's task configuration.");
+		}
+		
+		OutputFormat<?> outputFormat;
+		try {
+			outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
+		}
+		try {
+			outputFormat.configure(cfg.getStubParameters());
+		}
+		catch (Throwable t) {
+			throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
 		}
 		
-		if (this.outputFormat instanceof InitializeOnMaster) {
-			((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism());
+		if (outputFormat instanceof InitializeOnMaster) {
+			((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
 		}
 	}
 }


Mime
View raw message