flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/3] flink git commit: [FLINK-1319] [core] Add static code analysis for user code
Date Mon, 08 Jun 2015 05:33:52 GMT
[FLINK-1319] [core] Add static code analysis for user code

This closes #729.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c854d526
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c854d526
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c854d526

Branch: refs/heads/master
Commit: c854d5260c20b0926c4347c7c9dd7d0f4f11d620
Parents: d433ba9
Author: twalthr <twalthr@apache.org>
Authored: Tue May 26 20:22:03 2015 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jun 8 07:29:49 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/CodeAnalysisMode.java      |   52 +
 .../flink/api/common/ExecutionConfig.java       |   26 +
 flink-java/pom.xml                              |    4 +-
 .../flink/api/java/ExecutionEnvironment.java    |    3 +
 .../java/org/apache/flink/api/java/Utils.java   |    4 +
 .../api/java/functions/FunctionAnnotation.java  |   17 +-
 .../api/java/functions/SemanticPropUtil.java    |  133 +-
 .../api/java/operators/CoGroupOperator.java     |    6 +-
 .../flink/api/java/operators/CrossOperator.java |    2 +
 .../api/java/operators/FilterOperator.java      |    2 +
 .../api/java/operators/FlatMapOperator.java     |    4 +-
 .../java/operators/GroupCombineOperator.java    |    4 +-
 .../api/java/operators/GroupReduceOperator.java |    6 +-
 .../flink/api/java/operators/JoinOperator.java  |    8 +-
 .../flink/api/java/operators/MapOperator.java   |    2 +
 .../api/java/operators/ReduceOperator.java      |    6 +-
 .../java/operators/SingleInputUdfOperator.java  |   31 +-
 .../api/java/operators/TwoInputUdfOperator.java |   43 +-
 .../flink/api/java/operators/UdfOperator.java   |    1 -
 .../api/java/operators/UdfOperatorUtils.java    |  103 ++
 .../api/java/sca/CodeAnalyzerException.java     |   42 +
 .../flink/api/java/sca/CodeErrorException.java  |   42 +
 .../flink/api/java/sca/ModifiedASMAnalyzer.java |  169 +++
 .../flink/api/java/sca/ModifiedASMFrame.java    |   84 ++
 .../api/java/sca/NestedMethodAnalyzer.java      |  730 ++++++++++
 .../apache/flink/api/java/sca/TaggedValue.java  |  421 ++++++
 .../apache/flink/api/java/sca/UdfAnalyzer.java  |  474 ++++++
 .../flink/api/java/sca/UdfAnalyzerUtils.java    |  329 +++++
 .../SemanticPropertiesPrecedenceTest.java       |  183 +++
 .../api/java/sca/UdfAnalyzerExamplesTest.java   |  707 +++++++++
 .../flink/api/java/sca/UdfAnalyzerTest.java     | 1353 ++++++++++++++++++
 .../apache/flink/test/util/TestEnvironment.java |    4 +-
 pom.xml                                         |    4 +-
 33 files changed, 4916 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
new file mode 100644
index 0000000..e9d8541
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * Specifies to which extent user-defined functions are analyzed in order
+ * to give the Flink optimizer an insight of UDF internals and inform
+ * the user about common implementation mistakes.
+ *
+ * The analyzer gives hints about:
+ *  - ForwardedFields semantic properties
+ *  - Warnings if static fields are modified by a Function
+ *  - Warnings if a FilterFunction modifies its input objects
+ *  - Warnings if a Function returns null
+ *  - Warnings if a tuple access uses a wrong index
+ *  - Information about the number of object creations (for manual optimization)
+ */
+public enum CodeAnalysisMode {
+
+	/**
+	 * Code analysis does not take place.
+	 */
+	DISABLE,
+
+	/**
+	 * Hints for improvement of the program are printed to the log.
+	 */
+	HINT,
+
+	/**
+	 * The program will be automatically optimized with knowledge from code
+	 * analysis.
+	 */
+	OPTIMIZE;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 04a518e..4974295 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -44,6 +44,10 @@ import java.util.Map;
  *         handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
  *         when the functions return not only the types declared in their signature, but
  *         also subclasses of those types.</li>
+ *     <li>The {@link CodeAnalysisMode} of the program: Enable hinting/optimizing or disable
+ *         the "static code analyzer". The static code analyzer pre-interprets user-defined functions in order to
+ *         get implementation insights for program improvements that can be printed to the log or
+ *         automatically applied.</li>
  * </ul>
  */
 public class ExecutionConfig implements Serializable {
@@ -78,6 +82,8 @@ public class ExecutionConfig implements Serializable {
 
 	private boolean forceAvro = false;
 
+	private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;
+
 	/** If set to true, progress updates are printed to System.out during execution */
 	private boolean printProgressDuringExecution = true;
 
@@ -316,6 +322,26 @@ public class ExecutionConfig implements Serializable {
 	public boolean isObjectReuseEnabled() {
 		return objectReuse;
 	}
+	
+	/**
+	 * Sets the {@link CodeAnalysisMode} of the program. Specifies to which extent user-defined
+	 * functions are analyzed in order to give the Flink optimizer an insight of UDF internals
+	 * and inform the user about common implementation mistakes. The static code analyzer pre-interprets
+	 * user-defined functions in order to get implementation insights for program improvements
+	 * that can be printed to the log, automatically applied, or disabled.
+	 * 
+	 * @param codeAnalysisMode see {@link CodeAnalysisMode}
+	 */
+	public void setCodeAnalysisMode(CodeAnalysisMode codeAnalysisMode) {
+		this.codeAnalysisMode = codeAnalysisMode;
+	}
+	
+	/**
+	 * Returns the {@link CodeAnalysisMode} of the program.
+	 */
+	public CodeAnalysisMode getCodeAnalysisMode() {
+		return codeAnalysisMode;
+	}
 
 	/**
 	 * Enables the printing of progress update messages to {@code System.out}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6196e82..8879803 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -60,10 +60,10 @@ under the License.
 
 		<dependency>
 			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm</artifactId>
+			<artifactId>asm-all</artifactId>
 			<version>${asm.version}</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>com.twitter</groupId>
 			<artifactId>chill_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3a5b04f..d50ddb4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -991,6 +991,9 @@ public abstract class ExecutionEnvironment {
 			LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers()));
 			LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
 			LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes()));
+
+			// print information about static code analysis
+			LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
 		}
 
 		return plan;

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 38b24a2..dd1d6d2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -29,8 +29,10 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.List;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
+import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
 
 
 public class Utils {
@@ -70,6 +72,7 @@ public class Utils {
 		}
 	}
 
+	@SkipCodeAnalysis
 	public static class CountHelper<T> extends RichFlatMapFunction<T, Long> {
 
 		private static final long serialVersionUID = 1L;
@@ -93,6 +96,7 @@ public class Utils {
 		}
 	}
 
+	@SkipCodeAnalysis
 	public static class CollectHelper<T> extends RichFlatMapFunction<T, T> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index 09678fd..bfc1bf0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -369,7 +369,22 @@ public class FunctionAnnotation {
 	public @interface ReadFieldsSecond {
 		String[] value();
 	}
-	
+
+	/**
+	 * The SkipCodeAnalysis annotation declares that a function will not be analyzed by Flink's
+	 * code analysis capabilities independent of the configured {@link org.apache.flink.api.common.CodeAnalysisMode}.
+	 *
+	 * If this annotation is not present the static code analyzer pre-interprets user-defined
+	 * functions in order to get implementation insights for program improvements that can be
+	 * printed to the log as hints, automatically applied, or disabled (see
+	 * {@link org.apache.flink.api.common.ExecutionConfig}).
+	 *
+	 */
+	@Target(ElementType.TYPE)
+	@Retention(RetentionPolicy.RUNTIME)
+	public @interface SkipCodeAnalysis {
+	}
+
 	/**
 	 * Private constructor to prevent instantiation. This class is intended only as a container.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 4569be3..7640e2c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -19,14 +19,6 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.lang.annotation.Annotation;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
@@ -34,13 +26,13 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsSecond;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
@@ -48,6 +40,14 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
+import java.lang.annotation.Annotation;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class SemanticPropUtil {
 
 	private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
@@ -235,7 +235,7 @@ public class SemanticPropUtil {
 	public static SingleInputSemanticProperties getSemanticPropsSingle(
 			Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
 		if (set == null) {
-			return new SingleInputSemanticProperties();
+			return null;
 		}
 		Iterator<Annotation> it = set.iterator();
 
@@ -264,15 +264,14 @@ public class SemanticPropUtil {
 			SingleInputSemanticProperties result = new SingleInputSemanticProperties();
 			getSemanticPropsSingleFromString(result, forwarded, nonForwarded, read, inType, outType);
 			return result;
-		} else {
-			return new SingleInputSemanticProperties();
 		}
+		return null;
 	}
 
 	public static DualInputSemanticProperties getSemanticPropsDual(
 			Set<Annotation> set, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
 		if (set == null) {
-			return new DualInputSemanticProperties();
+			return null;
 		}
 		Iterator<Annotation> it = set.iterator();
 
@@ -309,15 +308,20 @@ public class SemanticPropUtil {
 			getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond,
 					nonForwardedFirst, nonForwardedSecond, readFirst, readSecond, inType1, inType2, outType);
 			return result;
-		} else {
-			return new DualInputSemanticProperties();
 		}
+		return null;
+	}
+
+	public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
+														String[] forwarded, String[] nonForwarded, String[] readSet,
+														TypeInformation<?> inType, TypeInformation<?> outType) {
+		getSemanticPropsSingleFromString(result, forwarded, nonForwarded, readSet, inType, outType, false);
 	}
 
 	public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
 			String[] forwarded, String[] nonForwarded, String[] readSet,
-			TypeInformation<?> inType, TypeInformation<?> outType)
-	{
+			TypeInformation<?> inType, TypeInformation<?> outType,
+			boolean skipIncompatibleTypes) {
 
 		boolean hasForwardedAnnotation = false;
 		boolean hasNonForwardedAnnotation = false;
@@ -334,9 +338,9 @@ public class SemanticPropUtil {
 			throw new InvalidSemanticAnnotationException("Either ForwardedFields OR " +
 					"NonForwardedFields annotation permitted, NOT both.");
 		} else if(hasForwardedAnnotation) {
-			parseForwardedFields(result, forwarded, inType, outType, 0);
+			parseForwardedFields(result, forwarded, inType, outType, 0, skipIncompatibleTypes);
 		} else if(hasNonForwardedAnnotation) {
-			parseNonForwardedFields(result, nonForwarded, inType, outType, 0);
+			parseNonForwardedFields(result, nonForwarded, inType, outType, 0, skipIncompatibleTypes);
 		}
 		parseReadFields(result, readSet, inType, 0);
 	}
@@ -345,8 +349,18 @@ public class SemanticPropUtil {
 			String[] forwardedFirst, String[] forwardedSecond,
 			String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
 			readFieldsFirst, String[] readFieldsSecond,
-			TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType)
-	{
+			TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
+		getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond, nonForwardedFirst,
+				nonForwardedSecond, readFieldsFirst, readFieldsSecond, inType1, inType2,  outType,
+				false);
+	}
+
+	public static void getSemanticPropsDualFromString(DualInputSemanticProperties result,
+			String[] forwardedFirst, String[] forwardedSecond,
+			String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
+			readFieldsFirst, String[] readFieldsSecond,
+			TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType,
+			boolean skipIncompatibleTypes) {
 
 		boolean hasForwardedFirstAnnotation = false;
 		boolean hasForwardedSecondAnnotation = false;
@@ -377,15 +391,15 @@ public class SemanticPropUtil {
 		}
 
 		if(hasForwardedFirstAnnotation) {
-			parseForwardedFields(result, forwardedFirst, inType1, outType, 0);
+			parseForwardedFields(result, forwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
 		} else if(hasNonForwardedFirstAnnotation) {
-			parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0);
+			parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
 		}
 
 		if(hasForwardedSecondAnnotation) {
-			parseForwardedFields(result, forwardedSecond, inType2, outType, 1);
+			parseForwardedFields(result, forwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
 		} else if(hasNonForwardedSecondAnnotation) {
-			parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1);
+			parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
 		}
 
 		parseReadFields(result, readFieldsFirst, inType1, 0);
@@ -393,7 +407,8 @@ public class SemanticPropUtil {
 	}
 
 
-	private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+	private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr,
+			TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
 
 		if (forwardedStr == null) {
 			return;
@@ -412,8 +427,13 @@ public class SemanticPropUtil {
 			if (wcMatcher.matches()) {
 
 				if (!inType.equals(outType)) {
-					throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
-							"\" with wildcard only allowed for identical input and output types.");
+					if (skipIncompatibleTypes) {
+						continue;
+					}
+					else {
+						throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
+								"\" with wildcard only allowed for identical input and output types.");
+					}
 				}
 
 				for (int i = 0; i < inType.getTotalFields(); i++) {
@@ -440,8 +460,13 @@ public class SemanticPropUtil {
 
 				try {
 					// check type compatibility
-					if (!areFieldsCompatible(sourceStr, inType, targetStr, outType)) {
-						throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+					if (!areFieldsCompatible(sourceStr, inType, targetStr, outType, !skipIncompatibleTypes)) {
+						if (skipIncompatibleTypes) {
+							continue;
+						}
+						else {
+							throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+						}
 					}
 					List<FlatFieldDescriptor> inFFDs = getFlatFields(sourceStr, inType);
 					List<FlatFieldDescriptor> outFFDs = getFlatFields(targetStr, outType);
@@ -478,8 +503,13 @@ public class SemanticPropUtil {
 					String fieldStr = fieldMatcher.group();
 					try {
 						// check if field is compatible in input and output type
-						if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType)) {
-							throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+						if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType, !skipIncompatibleTypes)) {
+							if (skipIncompatibleTypes) {
+								continue;
+							}
+							else {
+								throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+							}
 						}
 						// add flat field positions
 						List<FlatFieldDescriptor> inFFDs = getFlatFields(fieldStr, inType);
@@ -503,8 +533,8 @@ public class SemanticPropUtil {
 		}
 	}
 
-	private static void parseNonForwardedFields(
-			SemanticProperties sp, String[] nonForwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+	private static void parseNonForwardedFields(SemanticProperties sp, String[] nonForwardedStr,
+			TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
 
 		if(nonForwardedStr == null) {
 			return;
@@ -521,7 +551,12 @@ public class SemanticPropUtil {
 			}
 
 			if(!inType.equals(outType)) {
-				throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+				if (skipIncompatibleTypes) {
+					continue;
+				}
+				else {
+					throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+				}
 			}
 
 			Matcher matcher = PATTERN_LIST.matcher(s);
@@ -613,14 +648,24 @@ public class SemanticPropUtil {
 
 	////////////////////// UTIL METHODS ///////////////////////////////
 
-	private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField, TypeInformation<?> outType) {
-
-		// get source type information
-		TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
-		// get target type information
-		TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+	private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField,
+			TypeInformation<?> outType, boolean throwException) {
 
-		return (sourceType.equals(targetType));
+		try {
+			// get source type information
+			TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
+			// get target type information
+			TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+			return sourceType.equals(targetType);
+		}
+		catch (InvalidFieldReferenceException e) {
+			if (throwException) {
+				throw e;
+			}
+			else {
+				return false;
+			}
+		}
 	}
 
 	private static TypeInformation<?> getExpressionTypeInformation(String fieldStr, TypeInformation<?> typeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 115a238..36378b9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -124,6 +124,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 		this.keys1 = keys1;
 		this.keys2 = keys2;
+
+		UdfOperatorUtils.analyzeDualInputUdf(this, CoGroupFunction.class, defaultName, function, keys1, keys2);
 	}
 	
 	@Override
@@ -144,9 +146,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			int numFields1 = this.getInput1Type().getTotalFields();
 			int numFields2 = this.getInput2Type().getTotalFields();
 			int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
-					((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+					((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
 			int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
-					((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+					((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
 
 			props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 5ed3e40..ae990ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -67,6 +67,8 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		this.function = function;
 		this.defaultName = defaultName;
 		this.hint = hint;
+
+		UdfOperatorUtils.analyzeDualInputUdf(this, CrossFunction.class, defaultName, function, null, null);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index f55de1c..70bfa93 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -41,6 +41,8 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
 		
 		this.function = function;
 		this.defaultName = defaultName;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, FilterFunction.class, defaultName, function, null);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 8caacae..10bb286 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -43,13 +43,15 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 		
 		this.function = function;
 		this.defaultName = defaultName;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, FlatMapFunction.class, defaultName, function, null);
 	}
 	
 	@Override
 	protected FlatMapFunction<IN, OUT> getFunction() {
 		return function;
 	}
-	
+
 	@Override
 	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
 		String name = getName() != null ? getName() : "FlatMap at "+defaultName;

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index dc26fec..30cb0be 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -98,9 +98,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 				this.grouper != null &&
 				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
-				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index bc4413f..fcbb888 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -87,6 +87,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		this.defaultName = defaultName;
 
 		checkCombinability();
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, GroupReduceFunction.class, defaultName, function, grouper.keys);
 	}
 
 	private void checkCombinability() {
@@ -132,9 +134,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 				this.grouper != null &&
 				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
-				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 4adf6b3..1e5baab 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -202,6 +202,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			this.function = function;
 			this.joinLocationName = joinLocationName;
+
+			UdfOperatorUtils.analyzeDualInputUdf(this, FlatJoinFunction.class, joinLocationName, function, keys1, keys2);
 		}
 
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
@@ -217,6 +219,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 
 			this.function = generatedFunction;
+
+			UdfOperatorUtils.analyzeDualInputUdf(this, JoinFunction.class, joinLocationName, function, keys1, keys2);
 		}
 		
 		@Override
@@ -237,9 +241,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				int numFields1 = this.getInput1Type().getTotalFields();
 				int numFields2 = this.getInput2Type().getTotalFields();
 				int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
-						((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+						((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
 				int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
-						((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+						((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
 
 				props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 2663a2a..eaaeb38 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -45,6 +45,8 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 		
 		this.defaultName = defaultName;
 		this.function = function;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, MapFunction.class, defaultName, function, null);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index e770278..1193da5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -72,6 +72,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		this.function = function;
 		this.grouper = input;
 		this.defaultName = defaultName;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, ReduceFunction.class, defaultName, function, grouper.keys);
 	}
 	
 	@Override
@@ -89,9 +91,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 				this.grouper != null &&
 				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
-				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index f55489f..9301e1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -54,8 +54,11 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 
 	private Map<String, DataSet<?>> broadcastVariables;
 
+	// NOTE: only set this variable via setSemanticProperties()
 	private SingleInputSemanticProperties udfSemantics;
 
+	private boolean analyzedUdfSemantics;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -157,11 +160,12 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 
 		if(this.udfSemantics == null) {
 			// extract semantic properties from function annotations
-			this.udfSemantics = extractSemanticAnnotations(getFunction().getClass());
+			setSemanticProperties(extractSemanticAnnotations(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null) {
-			this.udfSemantics = new SingleInputSemanticProperties();
+		if(this.udfSemantics == null
+				|| this.analyzedUdfSemantics) { // discard analyzed semantic properties
+			setSemanticProperties(new SingleInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType());
 		} else {
 			if(udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
@@ -311,11 +315,15 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 
 	@Override
 	public SingleInputSemanticProperties getSemanticProperties() {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || analyzedUdfSemantics) {
 			SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass());
-			this.udfSemantics = props != null ? props : new SingleInputSemanticProperties();
+			if (props != null) {
+				setSemanticProperties(props);
+			}
+		}
+		if (this.udfSemantics == null) {
+			setSemanticProperties(new SingleInputSemanticProperties());
 		}
-		
 		return this.udfSemantics;
 	}
 
@@ -329,8 +337,17 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 */
 	public void setSemanticProperties(SingleInputSemanticProperties properties) {
 		this.udfSemantics = properties;
+		this.analyzedUdfSemantics = false;
 	}
-	
+
+	protected boolean getAnalyzedUdfSemanticsFlag() {
+		return this.analyzedUdfSemantics;
+	}
+
+	protected void setAnalyzedUdfSemanticsFlag() {
+		this.analyzedUdfSemantics = true;
+	}
+
 	protected SingleInputSemanticProperties extractSemanticAnnotations(Class<?> udfClass) {
 		Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(udfClass);
 		return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType());

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 91f9f7e..d23dd56 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -29,12 +29,12 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.DataSet;
 
 /**
  * The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute
@@ -56,8 +56,11 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 
 	private Map<String, DataSet<?>> broadcastVariables;
 
+	// NOTE: only set this variable via setSemanticProperties()
 	private DualInputSemanticProperties udfSemantics;
 
+	private boolean analyzedUdfSemantics;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -157,13 +160,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 */
 	@SuppressWarnings("unchecked")
 	public O withForwardedFieldsFirst(String... forwardedFieldsFirst) {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || this.analyzedUdfSemantics) {
 			// extract semantic properties from function annotations
-			this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+			setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null) {
-			this.udfSemantics = new DualInputSemanticProperties();
+		if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+			setSemanticProperties(new DualInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, forwardedFieldsFirst, null,
 					null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
 		} else {
@@ -232,13 +235,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 */
 	@SuppressWarnings("unchecked")
 	public O withForwardedFieldsSecond(String... forwardedFieldsSecond) {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || this.analyzedUdfSemantics) {
 			// extract semantic properties from function annotations
-			this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+			setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null) {
-			this.udfSemantics = new DualInputSemanticProperties();
+		if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+			setSemanticProperties(new DualInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, forwardedFieldsSecond,
 					null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
 		} else {
@@ -390,11 +393,15 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 
 	@Override
 	public DualInputSemanticProperties getSemanticProperties() {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || analyzedUdfSemantics) {
 			DualInputSemanticProperties props = extractSemanticAnnotationsFromUdf(getFunction().getClass());
-			this.udfSemantics = props != null ? props : new DualInputSemanticProperties();
+			if (props != null) {
+				setSemanticProperties(props);
+			}
+		}
+		if (this.udfSemantics == null) {
+			setSemanticProperties(new DualInputSemanticProperties());
 		}
-		
 		return this.udfSemantics;
 	}
 
@@ -408,9 +415,17 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 */
 	public void setSemanticProperties(DualInputSemanticProperties properties) {
 		this.udfSemantics = properties;
+		this.analyzedUdfSemantics = false;
 	}
-	
-	
+
+	protected boolean getAnalyzedUdfSemanticsFlag() {
+		return this.analyzedUdfSemantics;
+	}
+
+	protected void setAnalyzedUdfSemanticsFlag() {
+		this.analyzedUdfSemantics = true;
+	}
+
 	protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
 		Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(udfClass);
 		return SemanticPropUtil.getSemanticPropsDual(annotations, getInput1Type(), getInput2Type(), getResultType());

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index 026cc61..924c84f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -57,7 +57,6 @@ public interface UdfOperator<O extends UdfOperator<O>> {
 	
 	/**
 	 * Gets the semantic properties that have been set for the user-defined functions (UDF).
-	 * This method may return null, if no semantic properties have been set so far.
 	 * 
 	 * @return The semantic properties of the UDF.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
new file mode 100644
index 0000000..52a0d08
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.sca.CodeAnalyzerException;
+import org.apache.flink.api.java.sca.UdfAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class UdfOperatorUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(UdfOperatorUtils.class);
+
+	public static void analyzeSingleInputUdf(SingleInputUdfOperator<?, ?, ?> operator, Class<?> udfBaseClass,
+			String defaultName, Function udf, Keys<?> key) {
+		final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+		if (mode != CodeAnalysisMode.DISABLE
+				&& !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+			final String operatorName = operator.getName() != null ? operator.getName()
+					: udfBaseClass.getSimpleName() + " at "+defaultName;
+			try {
+				final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInputType(), null,
+						operator.getResultType(), key, null, mode == CodeAnalysisMode.OPTIMIZE);
+				final boolean success = analyzer.analyze();
+				if (success) {
+					if (mode == CodeAnalysisMode.OPTIMIZE
+							&& !operator.udfWithForwardedFieldsAnnotation(udf.getClass())) {
+						analyzer.addSemanticPropertiesHints();
+						operator.setSemanticProperties((SingleInputSemanticProperties) analyzer.getSemanticProperties());
+						operator.setAnalyzedUdfSemanticsFlag();
+					}
+					else if (mode == CodeAnalysisMode.HINT) {
+						analyzer.addSemanticPropertiesHints();
+					}
+					analyzer.printToLogger(LOG);
+				}
+			}
+			catch (InvalidTypesException e) {
+				LOG.debug("Unable to do code analysis due to missing type information.", e);
+			}
+			catch (CodeAnalyzerException e) {
+				LOG.debug("Code analysis failed.", e);
+			}
+		}
+	}
+
+	public static void analyzeDualInputUdf(TwoInputUdfOperator<?, ?, ?, ?> operator, Class<?> udfBaseClass,
+			String defaultName, Function udf, Keys<?> key1, Keys<?> key2) {
+		final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+		if (mode != CodeAnalysisMode.DISABLE
+				&& !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+			final String operatorName = operator.getName() != null ? operator.getName()
+					: udfBaseClass.getSimpleName() + " at " + defaultName;
+			try {
+				final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInput1Type(),
+						operator.getInput2Type(), operator.getResultType(), key1, key2,
+						mode == CodeAnalysisMode.OPTIMIZE);
+				final boolean success = analyzer.analyze();
+				if (success) {
+					if (mode == CodeAnalysisMode.OPTIMIZE
+							&& !(operator.udfWithForwardedFieldsFirstAnnotation(udf.getClass())
+							|| operator.udfWithForwardedFieldsSecondAnnotation(udf.getClass()))) {
+						analyzer.addSemanticPropertiesHints();
+						operator.setSemanticProperties((DualInputSemanticProperties) analyzer.getSemanticProperties());
+						operator.setAnalyzedUdfSemanticsFlag();
+					}
+					else if (mode == CodeAnalysisMode.HINT) {
+						analyzer.addSemanticPropertiesHints();
+					}
+					analyzer.printToLogger(LOG);
+				}
+			}
+			catch (InvalidTypesException e) {
+				LOG.debug("Unable to do code analysis due to missing type information.", e);
+			}
+			catch (CodeAnalyzerException e) {
+				LOG.debug("Code analysis failed.", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
new file mode 100644
index 0000000..e42ae67
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code analysis could not run properly.
+ */
+public class CodeAnalyzerException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	public CodeAnalyzerException() {
+		super();
+	}
+
+	public CodeAnalyzerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CodeAnalyzerException(String message) {
+		super(message);
+	}
+
+	public CodeAnalyzerException(Throwable cause) {
+		super(cause);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
new file mode 100644
index 0000000..9afe5d8
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code errors could be found during analysis.
+ */
+public class CodeErrorException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	public CodeErrorException() {
+		super();
+	}
+
+	public CodeErrorException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CodeErrorException(String message) {
+		super(message);
+	}
+
+	public CodeErrorException(Throwable cause) {
+		super(cause);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
new file mode 100644
index 0000000..4c0d020
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.InsnList;
+import org.objectweb.asm.tree.JumpInsnNode;
+import org.objectweb.asm.tree.analysis.Analyzer;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+
+import java.lang.reflect.Field;
+
+/**
+ * Modified version of ASMs Analyzer. It defines a custom ASM Frame
+ * and allows jump modification which is necessary for UDFs with
+ * one iterable input e.g. GroupReduce.
+ * (see also UdfAnalyzer's field "iteratorTrueAssumptionApplied")
+ */
+public class ModifiedASMAnalyzer extends Analyzer {
+
+	private NestedMethodAnalyzer interpreter;
+
+	public ModifiedASMAnalyzer(Interpreter interpreter) {
+		super(interpreter);
+		this.interpreter = (NestedMethodAnalyzer) interpreter;
+	}
+
+	protected Frame newFrame(int nLocals, int nStack) {
+		return new ModifiedASMFrame(nLocals, nStack);
+	}
+
+	protected Frame newFrame(Frame src) {
+		return new ModifiedASMFrame(src);
+	}
+
+	// type of jump modification
+	private int jumpModification = NO_MOD;
+	private static final int NO_MOD = -1;
+	private static final int IFEQ_MOD = 0;
+	private static final int IFNE_MOD = 1;
+	private int eventInsn;
+
+	// current state of modification
+	private int jumpModificationState = DO_NOTHING;
+	private static final int DO_NOTHING = -1;
+	private static final int PRE_STATE = 0;
+	private static final int MOD_STATE = 1;
+	private static final int WAIT_FOR_INSN_STATE = 2;
+
+	public void requestIFEQLoopModification() {
+		if (jumpModificationState != DO_NOTHING) {
+			throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+		}
+		jumpModification = IFEQ_MOD;
+		jumpModificationState = PRE_STATE;
+	}
+
+	public void requestIFNELoopModification() {
+		if (jumpModificationState != DO_NOTHING) {
+			throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+		}
+		jumpModification = IFNE_MOD;
+		jumpModificationState = PRE_STATE;
+	}
+
+	@Override
+	protected void newControlFlowEdge(int insn, int successor) {
+		try {
+			if (jumpModificationState == PRE_STATE) {
+				jumpModificationState = MOD_STATE;
+			}
+			else if (jumpModificationState == MOD_STATE) {
+				// this modification swaps the top 2 values on the queue stack
+				// it ensures that the "TRUE" path will be executed first, which is important
+				// for a later merge
+				if (jumpModification == IFEQ_MOD) {
+					final int top = accessField(Analyzer.class, "top").getInt(this);
+					final int[] queue = (int[]) accessField(Analyzer.class, "queue").get(this);
+
+					final int tmp = queue[top - 2];
+					queue[top - 2] = queue[top - 1];
+					queue[top - 1] = tmp;
+
+					eventInsn = queue[top - 2] - 1;
+					final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+					// check if instruction is a goto instruction
+					// if yes this is loop structure
+					if (insns.get(eventInsn) instanceof JumpInsnNode) {
+						jumpModificationState = WAIT_FOR_INSN_STATE;
+					}
+					// no loop -> end of modification
+					else {
+						jumpModificationState = DO_NOTHING;
+					}
+				}
+				// this modification changes the merge priority of certain frames (the expression part of the IF)
+				else if (jumpModification == IFNE_MOD) {
+					final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+					final Field indexField = accessField(AbstractInsnNode.class, "index");
+
+					final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+					final AbstractInsnNode gotoInsnn = insns.get(successor - 1);
+					// check for a loop
+					if (gotoInsnn instanceof JumpInsnNode) {
+						jumpModificationState = WAIT_FOR_INSN_STATE;
+						// sets a merge priority for all instructions (the expression of the IF)
+						// from the label the goto instruction points to until the evaluation with IFEQ
+						final int idx = indexField.getInt(accessField(JumpInsnNode.class, "label").get(gotoInsnn));
+
+						for (int i=idx; i <= insn; i++) {
+							((ModifiedASMFrame) frames[i]).mergePriority = true;
+						}
+						eventInsn = idx - 2;
+					}
+					else {
+						jumpModificationState = DO_NOTHING;
+					}
+				}
+			}
+			// wait for the goto instruction
+			else if (jumpModificationState == WAIT_FOR_INSN_STATE && insn == eventInsn) {
+				jumpModificationState = DO_NOTHING;
+				final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+				// merge the goto instruction frame with the frame that follows
+				// this ensures that local variables are kept
+				if (jumpModification == IFEQ_MOD) {
+					interpreter.rightMergePriority = true;
+					final Field top = accessField(Frame.class, "top");
+					top.setInt(frames[eventInsn], top.getInt(frames[eventInsn + 1]));
+					frames[eventInsn + 1].merge(frames[eventInsn], interpreter);
+				}
+				// finally set a merge priority for the last instruction of the loop (before the IF expression)
+				else if (jumpModification == IFNE_MOD) {
+					((ModifiedASMFrame) frames[eventInsn + 1]).mergePriority = true;
+				}
+			}
+		}
+		catch (Exception e) {
+			throw new CodeAnalyzerException("Unable to do jump modifications.", e);
+		}
+	}
+
+	private Field accessField(Class<?> clazz, String name) {
+		for (Field f : clazz.getDeclaredFields()) {
+			if (f.getName().equals(name)) {
+				f.setAccessible(true);
+				return f;
+			}
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
new file mode 100644
index 0000000..497a15c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+import org.objectweb.asm.tree.analysis.Value;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Modified version of ASMs Frame. It allows to perform different merge
+ * priorities and passes frame information to the Interpreter.
+ */
+public class ModifiedASMFrame extends Frame {
+
+	public boolean mergePriority;
+
+	public ModifiedASMFrame(int nLocals, int nStack) {
+		super(nLocals, nStack);
+	}
+	public ModifiedASMFrame(Frame src) {
+		super(src);
+	}
+
+	@Override
+	public Frame init(Frame src) {
+		mergePriority = ((ModifiedASMFrame)src).mergePriority;
+		return super.init(src);
+	}
+
+	@Override
+	public void execute(AbstractInsnNode insn, Interpreter interpreter)
+			throws AnalyzerException {
+		NestedMethodAnalyzer nma = ((NestedMethodAnalyzer) interpreter);
+		nma.currentFrame = (ModifiedASMFrame) this;
+		super.execute(insn, interpreter);
+	}
+
+	@Override
+	public boolean merge(Frame frame, Interpreter interpreter) throws AnalyzerException {
+		if (((ModifiedASMFrame)frame).mergePriority) {
+			((NestedMethodAnalyzer)interpreter).rightMergePriority = true;
+		}
+		final boolean result = super.merge(frame, interpreter);
+		((NestedMethodAnalyzer)interpreter).rightMergePriority = false;
+		((ModifiedASMFrame)frame).mergePriority = false;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		// FOR DEBUGGING
+		try {
+			Class<?> frame = Frame.class;
+			Field valuesField = frame.getDeclaredField("values");
+			valuesField.setAccessible(true);
+			Value[] newValues = (Value[]) valuesField.get(this);
+			return Arrays.toString(newValues);
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+}
\ No newline at end of file


Mime
View raw message