flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [6/6] flink git commit: [FLINK-947] Add a declarative Expression API
Date Mon, 23 Feb 2015 15:05:30 GMT
[FLINK-947] Add a declarative Expression API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e868d596
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e868d596
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e868d596

Branch: refs/heads/master
Commit: e868d59611126795d284d66b9c5674a1f61a90a1
Parents: 98da559
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Oct 1 13:12:18 2014 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 23 13:09:08 2015 +0100

----------------------------------------------------------------------
 docs/_includes/sidenav.html                     |   1 +
 docs/linq.md                                    |  80 +++
 .../api/common/typeinfo/BasicTypeInfo.java      |  44 +-
 .../api/common/typeinfo/FractionalTypeInfo.java |  32 +
 .../api/common/typeinfo/IntegerTypeInfo.java    |  32 +
 .../api/common/typeinfo/NumericTypeInfo.java    |  33 +
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |   9 +
 .../api/common/typeinfo/TypeInformation.java    |  12 +
 .../apache/flink/api/java/operators/Keys.java   |  12 +-
 .../flink/api/java/typeutils/PojoField.java     |  37 +-
 .../flink/api/scala/codegen/TreeGen.scala       |   6 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala |  13 +-
 .../api/scala/typeutils/EitherTypeInfo.scala    |   3 +
 .../api/scala/typeutils/OptionTypeInfo.scala    |   4 +
 .../scala/typeutils/TraversableTypeInfo.scala   |   4 +
 .../flink/api/scala/typeutils/TryTypeInfo.scala |   4 +-
 flink-staging/flink-expressions/pom.xml         | 234 +++++++
 .../examples/java/JavaExpressionExample.java    |  78 +++
 .../api/expressions/ExpressionException.scala   |  23 +
 .../api/expressions/ExpressionOperation.scala   | 170 +++++
 .../org/apache/flink/api/expressions/Row.scala  |  38 ++
 .../api/expressions/analysis/Analyzer.scala     |  38 ++
 .../analysis/ExtractEquiJoinFields.scala        |  70 +++
 .../expressions/analysis/GroupByAnalyzer.scala  |  48 ++
 .../expressions/analysis/InsertAutoCasts.scala  |  91 +++
 .../analysis/PredicateAnalyzer.scala            |  32 +
 .../analysis/ResolveFieldReferences.scala       |  57 ++
 .../flink/api/expressions/analysis/Rule.scala   |  30 +
 .../analysis/SelectionAnalyzer.scala            |  33 +
 .../api/expressions/analysis/TypeCheck.scala    |  56 ++
 .../expressions/analysis/VerifyBoolean.scala    |  40 ++
 .../analysis/VerifyNoNestedAggregates.scala     |  52 ++
 .../codegen/ExpressionCodeGenerator.scala       | 630 +++++++++++++++++++
 .../codegen/GenerateBinaryPredicate.scala       |  73 +++
 .../codegen/GenerateBinaryResultAssembler.scala |  60 ++
 .../codegen/GenerateResultAssembler.scala       |  99 +++
 .../codegen/GenerateUnaryPredicate.scala        |  67 ++
 .../codegen/GenerateUnaryResultAssembler.scala  |  57 ++
 .../flink/api/expressions/codegen/package.scala |  25 +
 .../operations/ExpandAggregations.scala         | 144 +++++
 .../operations/OperationTranslator.scala        |  35 ++
 .../api/expressions/operations/operations.scala | 101 +++
 .../api/expressions/operations/package.scala    |  24 +
 .../runtime/ExpressionAggregateFunction.scala   |  72 +++
 .../runtime/ExpressionFilterFunction.scala      |  47 ++
 .../runtime/ExpressionJoinFunction.scala        |  76 +++
 .../runtime/ExpressionSelectFunction.scala      |  51 ++
 .../flink/api/expressions/runtime/package.scala |  23 +
 .../flink/api/expressions/tree/Expression.scala | 149 +++++
 .../api/expressions/tree/aggregations.scala     |  99 +++
 .../flink/api/expressions/tree/arithmetic.scala | 145 +++++
 .../flink/api/expressions/tree/cast.scala       |  24 +
 .../flink/api/expressions/tree/comparison.scala |  93 +++
 .../api/expressions/tree/fieldExpression.scala  |  41 ++
 .../flink/api/expressions/tree/literals.scala   |  40 ++
 .../flink/api/expressions/tree/logic.scala      |  58 ++
 .../flink/api/expressions/tree/package.scala    |  29 +
 .../expressions/tree/stringExpressions.scala    |  46 ++
 .../expressions/typeinfo/RenameOperator.scala   |  36 ++
 .../typeinfo/RenamingProxyTypeInfo.scala        | 109 ++++
 .../expressions/typeinfo/RowSerializer.scala    | 121 ++++
 .../api/expressions/typeinfo/RowTypeInfo.scala  |  51 ++
 .../scala/expressions/DataSetConversions.scala  |  66 ++
 .../expressions/DataStreamConversions.scala     |  65 ++
 .../scala/expressions/JavaBatchTranslator.scala | 356 +++++++++++
 .../expressions/JavaStreamingTranslator.scala   | 278 ++++++++
 .../expressions/ScalaBatchTranslator.scala      |  55 ++
 .../expressions/ScalaStreamingTranslator.scala  |  56 ++
 .../api/scala/expressions/expressionDsl.scala   | 117 ++++
 .../flink/api/scala/expressions/package.scala   | 102 +++
 .../examples/scala/PageRankExpression.scala     | 210 +++++++
 .../scala/StreamingExpressionFilter.scala       |  90 +++
 .../examples/scala/TPCHQuery3Expression.scala   | 174 +++++
 .../scala/expressions/AggregationsITCase.scala  | 126 ++++
 .../flink/api/scala/expressions/AsITCase.scala  | 126 ++++
 .../api/scala/expressions/CastingITCase.scala   |  93 +++
 .../scala/expressions/ExpressionsITCase.scala   | 126 ++++
 .../api/scala/expressions/FilterITCase.scala    | 150 +++++
 .../GroupedAggreagationsITCase.scala            |  99 +++
 .../api/scala/expressions/JoinITCase.scala      | 132 ++++
 .../expressions/PageRankExpressionITCase.java   | 100 +++
 .../api/scala/expressions/SelectITCase.scala    | 130 ++++
 .../expressions/StringExpressionsITCase.scala   |  97 +++
 flink-staging/pom.xml                           |   1 +
 84 files changed, 6757 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/docs/_includes/sidenav.html
----------------------------------------------------------------------
diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 3f65a76..526d6ed 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -39,6 +39,7 @@ under the License.
   <li><div class="sidenav-item"><a href="streaming_guide.html">Streaming Guide</a></div></li>
   <li><div class="sidenav-item"><a href="iterations.html">Iterations</a></div></li>
   <li><div class="sidenav-item"><a href="spargel_guide.html">Spargel Graph API</a></div></li>
+  <li><div class="sidenav-item"><a href="linq.html">Language-Integrated Queries</a></div></li>
   <li><div class="sidenav-item-bottom"><a href="hadoop_compatibility.html">Hadoop Compatibility</a></div></li>
 
   <li><div class="sidenav-category">Examples</div></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/docs/linq.md
----------------------------------------------------------------------
diff --git a/docs/linq.md b/docs/linq.md
new file mode 100644
index 0000000..ebb0063
--- /dev/null
+++ b/docs/linq.md
@@ -0,0 +1,80 @@
+---
+title: "Language-Integrated Queries"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+**Language-Integrated Queries are an experimental feature and can currently only be used with
+the Scala API**
+
+Flink provides an API that allows specifying operations using SQL-like expressions.
+This Expression API can be enabled by importing
+`org.apache.flink.api.scala.expressions._`.  This enables implicit conversions that allow
+converting a `DataSet` or `DataStream` to an `ExpressionOperation` on which relational queries
+can be specified. This example shows how a `DataSet` can be converted, how expression operations
+can be specified and how an expression operation can be converted back to a `DataSet`:
+
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._ 
+
+case class WC(word: String, count: Int)
+val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+val expr = input.toExpression
+val result = expr.groupBy('word).select('word, 'count.sum).as[WC]
+{% endhighlight %}
+
+The expression DSL uses Scala symbols to refer to field names and we use code generation to
+transform expressions to efficient runtime code. Please not that the conversion to and from
+expression operations only works when using Scala case classes or Flink POJOs. Please check out
+the [programming guide](programming_guide.html) to learn the requirements for a class to be 
+considered a POJO.
+ 
+This is another example that shows how you
+can join to operations:
+
+{% highlight scala %}
+case class MyResult(a: String, b: Int)
+
+val input1 = env.fromElements(...).as('a, 'b)
+val input2 = env.fromElements(...).as('c, 'd)
+val joined = input1.join(input2).where('b == 'a && 'd > 42).select('a, 'd).as[MyResult]
+{% endhighlight %}
+
+Notice, how a `DataSet` can be converted to an expression operation by using `as` and specifying new
+names for the fields. This can also be used to disambiguate fields before a join operation.
+
+The Expression API can be used with the Streaming API, since we also have implicit conversions to
+and from `DataStream`.
+
+The following dependency must be added to your project when using the Expression API:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-expressions</artifactId>
+  <version>{{site.FLINK_VERSION_SHORT }}</version>
+</dependency>
+{% endhighlight %}
+
+Please refer to the scaladoc for a full list of supported operations and a description of the
+expression syntax. 

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index aa0a445..07ca478 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -55,37 +55,51 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
  */
 public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
 
-	private static final long serialVersionUID = 1L;
-	
-	public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<String>(String.class, StringSerializer.INSTANCE, StringComparator.class);
-	public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO = new BasicTypeInfo<Boolean>(Boolean.class, BooleanSerializer.INSTANCE, BooleanComparator.class);
-	public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO = new BasicTypeInfo<Byte>(Byte.class, ByteSerializer.INSTANCE, ByteComparator.class);
-	public static final BasicTypeInfo<Short> SHORT_TYPE_INFO = new BasicTypeInfo<Short>(Short.class, ShortSerializer.INSTANCE, ShortComparator.class);
-	public static final BasicTypeInfo<Integer> INT_TYPE_INFO = new BasicTypeInfo<Integer>(Integer.class, IntSerializer.INSTANCE, IntComparator.class);
-	public static final BasicTypeInfo<Long> LONG_TYPE_INFO = new BasicTypeInfo<Long>(Long.class, LongSerializer.INSTANCE, LongComparator.class);
-	public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new BasicTypeInfo<Float>(Float.class, FloatSerializer.INSTANCE, FloatComparator.class);
-	public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class);
-	public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, CharSerializer.INSTANCE, CharComparator.class);
-	public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, DateSerializer.INSTANCE, DateComparator.class);
-	public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, VoidSerializer.INSTANCE, null);
+	public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<String>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);
+	public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO = new BasicTypeInfo<Boolean>(Boolean.class, new Class<?>[]{}, BooleanSerializer.INSTANCE, BooleanComparator.class);
+	public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO = new IntegerTypeInfo<Byte>(Byte.class, new Class<?>[]{Short.class, Integer.class, Long.class, Float.class, Double.class, Character.class}, ByteSerializer.INSTANCE, ByteComparator.class);
+	public static final BasicTypeInfo<Short> SHORT_TYPE_INFO = new IntegerTypeInfo<Short>(Short.class, new Class<?>[]{Integer.class, Long.class, Float.class, Double.class, Character.class}, ShortSerializer.INSTANCE, ShortComparator.class);
+	public static final BasicTypeInfo<Integer> INT_TYPE_INFO = new IntegerTypeInfo<Integer>(Integer.class, new Class<?>[]{Long.class, Float.class, Double.class, Character.class}, IntSerializer.INSTANCE, IntComparator.class);
+	public static final BasicTypeInfo<Long> LONG_TYPE_INFO = new IntegerTypeInfo<Long>(Long.class, new Class<?>[]{Float.class, Double.class, Character.class}, LongSerializer.INSTANCE, LongComparator.class);
+	public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new FractionalTypeInfo<Float>(Float.class, new Class<?>[]{Double.class}, FloatSerializer.INSTANCE, FloatComparator.class);
+	public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new FractionalTypeInfo<Double>(Double.class, new Class<?>[]{}, DoubleSerializer.INSTANCE, DoubleComparator.class);
+	public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, new Class<?>[]{}, CharSerializer.INSTANCE, CharComparator.class);
+	public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, new Class<?>[]{}, DateSerializer.INSTANCE, DateComparator.class);
+	public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null);
 	
 	// --------------------------------------------------------------------------------------------
 
 	private final Class<T> clazz;
 	
 	private final TypeSerializer<T> serializer;
+
+	private final Class<?>[] possibleCastTargetTypes;
 	
 	private final Class<? extends TypeComparator<T>> comparatorClass;
 	
 	
-	private BasicTypeInfo(Class<T> clazz, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
+	protected BasicTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
 		this.clazz = clazz;
+		this.possibleCastTargetTypes = possibleCastTargetTypes;
 		this.serializer = serializer;
 		this.comparatorClass = comparatorClass;
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Returns whether this type should be automatically casted to
+	 * the target type in an arithmetic operation.
+	 */
+	public boolean canCastTo(BasicTypeInfo<?> to) {
+		for (Class<?> possibleTo: possibleCastTargetTypes) {
+			if (possibleTo.equals(to.getTypeClass())) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	@Override
 	public boolean isBasicType() {
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
new file mode 100644
index 0000000..7e5e95d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Type information for numeric primitive types (int, long, double, byte, ...).
+ */
+public class FractionalTypeInfo<T> extends NumericTypeInfo<T> {
+
+	protected FractionalTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
+		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
new file mode 100644
index 0000000..5a7e304
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Type information for numeric primitive types (int, long, double, byte, ...).
+ */
+public class IntegerTypeInfo<T> extends NumericTypeInfo<T> {
+
+	protected IntegerTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
+		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
new file mode 100644
index 0000000..0f598f4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Type information for numeric primitive types (int, long, double, byte, ...).
+ */
+public class NumericTypeInfo<T> extends BasicTypeInfo<T> {
+
+	protected NumericTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends
+			TypeComparator<T>> comparatorClass) {
+		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 3ae06a3..83126ab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -117,6 +117,15 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 	public String toString() {
 		return arrayClass.getComponentType().getName() + "[]";
 	}
+
+	@Override
+	public boolean equals(Object other) {
+		if (other instanceof PrimitiveArrayTypeInfo) {
+			PrimitiveArrayTypeInfo otherArray = (PrimitiveArrayTypeInfo) other;
+			return otherArray.arrayClass == arrayClass;
+		}
+		return false;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index beea169..4fa02e3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * TypeInformation is the core class of Flink's type system. Flink requires a type information
@@ -114,6 +116,16 @@ public abstract class TypeInformation<T> implements Serializable {
 	public abstract Class<T> getTypeClass();
 
 	/**
+	 * Returns the generic parameters of this type.
+	 *
+	 * @return The list of generic parameters. This list can be empty.
+	 */
+	public List<TypeInformation<?>> getGenericParameters() {
+		// Return an empty list as the default implementation
+		return new LinkedList<TypeInformation<?>>();
+	}
+
+	/**
 	 * Checks whether this type can be used as a key. As a bare minimum, types have
 	 * to be hashable and comparable to be keys.
 	 *  

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 30ff91a..2c067fd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -223,17 +223,17 @@ public abstract class Keys<T> {
 			} else {
 				groupingFields = rangeCheckFields(groupingFields, type.getArity() -1);
 			}
-			TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>)type;
+			CompositeType<?> compositeType = (CompositeType<?>) type;
 			Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point");
 			
 			keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields());
 			// for each key, find the field:
 			for(int j = 0; j < groupingFields.length; j++) {
 				for(int i = 0; i < type.getArity(); i++) {
-					TypeInformation<?> fieldType = tupleType.getTypeAt(i);
+					TypeInformation<?> fieldType = compositeType.getTypeAt(i);
 					
 					if(groupingFields[j] == i) { // check if user set the key
-						int keyId = countNestedElementsBefore(tupleType, i) + i;
+						int keyId = countNestedElementsBefore(compositeType, i) + i;
 						if(fieldType instanceof TupleTypeInfoBase) {
 							TupleTypeInfoBase<?> tupleFieldType = (TupleTypeInfoBase<?>) fieldType;
 							tupleFieldType.addAllFields(keyId, keyFields);
@@ -248,13 +248,13 @@ public abstract class Keys<T> {
 			keyFields = removeNullElementsFromList(keyFields);
 		}
 		
-		private static int countNestedElementsBefore(TupleTypeInfoBase<?> tupleType, int pos) {
+		private static int countNestedElementsBefore(CompositeType<?> compositeType, int pos) {
 			if( pos == 0) {
 				return 0;
 			}
 			int ret = 0;
 			for (int i = 0; i < pos; i++) {
-				TypeInformation<?> fieldType = tupleType.getTypeAt(i);
+				TypeInformation<?> fieldType = compositeType.getTypeAt(i);
 				ret += fieldType.getTotalFields() -1;
 			}
 			return ret;
@@ -389,7 +389,7 @@ public abstract class Keys<T> {
 		int last = fields[0];
 
 		if (last < 0 || last > maxAllowedField) {
-			throw new IllegalArgumentException("Tuple position is out of range.");
+			throw new IllegalArgumentException("Tuple position is out of range: " + last);
 		}
 
 		for (; i < fields.length; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index bf0e25a..91a7a5e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.lang.reflect.Field;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
-public class PojoField {
-	public Field field;
+public class PojoField implements Serializable {
+	public transient Field field;
 	public TypeInformation<?> type;
 
 	public PojoField(Field field, TypeInformation<?> type) {
@@ -31,6 +35,35 @@ public class PojoField {
 		this.type = type;
 	}
 
+	private void writeObject(ObjectOutputStream out)
+			throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeObject(field.getDeclaringClass());
+		out.writeUTF(field.getName());
+	}
+
+	private void readObject(ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		Class<?> clazz = (Class<?>)in.readObject();
+		String fieldName = in.readUTF();
+		field = null;
+		// try superclasses as well
+		while (clazz != null) {
+			try {
+				field = clazz.getDeclaredField(fieldName);
+				field.setAccessible(true);
+				break;
+			} catch (NoSuchFieldException e) {
+				clazz = clazz.getSuperclass();
+			}
+		}
+		if (field == null) {
+			throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+					+ " (" + fieldName + ")");
+		}
+	}
+
 	@Override
 	public String toString() {
 		return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
index d883ba1..f6358d5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
@@ -68,13 +68,13 @@ private[flink] trait TreeGen[C <: Context] { this: MacroContextHolder[C] with Ty
     def mkCall(root: Tree, path: String*)(args: List[Tree]) = Apply(mkSelect(root, path: _*), args)
 
     def mkSeq(items: List[Tree]): Tree =
-      Apply(mkSelect("scala", "collection", "Seq", "apply"), items)
+      Apply(mkSelect("_root_", "scala", "collection", "Seq", "apply"), items)
 
     def mkList(items: List[Tree]): Tree =
-      Apply(mkSelect("scala", "collection", "immutable", "List", "apply"), items)
+      Apply(mkSelect("_root_", "scala", "collection", "immutable", "List", "apply"), items)
 
     def mkMap(items: List[Tree]): Tree =
-      Apply(mkSelect("scala", "collection", "immutable", "Map", "apply"), items)
+      Apply(mkSelect("_root_", "scala", "collection", "immutable", "Map", "apply"), items)
 
     def mkVal(name: String, flags: FlagSet, transient: Boolean, valTpe: Type, value: Tree): Tree = {
       ValDef(Modifiers(flags), newTermName(name), TypeTree(valTpe), value)

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index 5f35f2e..2c80433 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -30,6 +30,8 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, PojoTypeInfo}
 import PojoTypeInfo.NamedFlatFieldDescriptor
 
+import scala.collection.JavaConverters._
+
 /**
  * TypeInformation for Case Classes. Creation and access is different from
  * our Java Tuples so we have to treat them differently.
@@ -41,7 +43,10 @@ abstract class CaseClassTypeInfo[T <: Product](
     val fieldNames: Seq[String])
   extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
 
-  def getGenericParameters = typeParamTypeInfos
+  override def getGenericParameters: java.util.List[TypeInformation[_]] = {
+    typeParamTypeInfos.toList.asJava
+  }
+
   private val REGEX_INT_FIELD: String = "[0-9]+"
   private val REGEX_STR_FIELD: String = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"
   private val REGEX_FIELD: String = REGEX_STR_FIELD + "|" + REGEX_INT_FIELD
@@ -209,10 +214,6 @@ abstract class CaseClassTypeInfo[T <: Product](
       "\" in type " + this + ".")
   }
 
-  def getFieldIndices(fields: Array[String]): Array[Int] = {
-    fields map { x => fieldNames.indexOf(x) }
-  }
-
   override def getFieldNames: Array[String] = fieldNames.toArray
 
   override def getFieldIndex(fieldName: String): Int = {
@@ -224,7 +225,7 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
-  override def toString = clazz.getSimpleName + "(" + fieldNames.zip(types).map {
+  override def toString = clazz.getName + "(" + fieldNames.zip(types).map {
     case (n, t) => n + ": " + t}
     .mkString(", ") + ")"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index e2d3388..a1cded7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
+import scala.collection.JavaConverters._
+
 /**
  * TypeInformation [[Either]].
  */
@@ -36,6 +38,7 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
   override def getTotalFields: Int = 1
   override def getArity: Int = 1
   override def getTypeClass = clazz
+  override def getGenericParameters = List[TypeInformation[_]](leftTypeInfo, rightTypeInfo).asJava
 
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     val leftSerializer = if (leftTypeInfo != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 4d39f7f..510b604 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
+import scala.collection.JavaConverters._
+
 /**
  * TypeInformation for [[Option]].
  */
@@ -33,6 +35,8 @@ class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
   override def getTotalFields: Int = 1
   override def getArity: Int = 1
   override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
+  override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
+
 
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 96dc96d..76067bb 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
+import scala.collection.JavaConverters._
+
 import scala.collection.generic.CanBuildFrom
 
 /**
@@ -37,6 +39,8 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
   override def getTotalFields: Int = 1
   override def getArity: Int = 1
   override def getTypeClass: Class[T] = clazz
+  override def getGenericParameters = List[TypeInformation[_]](elementTypeInfo).asJava
+
 
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index 1396b8e..3749b37 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -20,7 +20,8 @@ package org.apache.flink.api.scala.typeutils
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+
+import scala.collection.JavaConverters._
 
 import scala.util.Try
 
@@ -36,6 +37,7 @@ class TryTypeInfo[A, T <: Try[A]](elemTypeInfo: TypeInformation[A])
   override def getTotalFields: Int = 1
   override def getArity: Int = 1
   override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]]
+  override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
 
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/pom.xml b/flink-staging/flink-expressions/pom.xml
new file mode 100644
index 0000000..800cc25
--- /dev/null
+++ b/flink-staging/flink-expressions/pom.xml
@@ -0,0 +1,234 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-staging</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-expressions</artifactId>
+	<name>flink-expressions</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-reflect</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-compiler</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scalamacros</groupId>
+			<artifactId>quasiquotes_${scala.binary.version}</artifactId>
+			<version>${scala.macros.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins>
+						<compilerPlugin>
+							<groupId>org.scalamacros</groupId>
+							<artifactId>paradise_${scala.version}</artifactId>
+							<version>${scala.macros.version}</version>
+						</compilerPlugin>
+					</compilerPlugins>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
new file mode 100644
index 0000000..055eaee
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.tree.EqualTo$;
+import org.apache.flink.api.expressions.tree.Expression;
+import org.apache.flink.api.expressions.tree.Literal$;
+import org.apache.flink.api.expressions.tree.UnresolvedFieldReference$;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+
+/**
+ * This is extremely bare-bones. We need a parser that can parse expressions in a String
+ * and create the correct expression AST. Then we can use expressions like this:
+ *
+ * {@code in.select("'field0.avg, 'field1.count") }
+ */
+public class JavaExpressionExample {
+
+	public static class WC {
+		public String word;
+		public int count;
+
+		public WC() {
+
+		}
+
+		public WC(String word, int count) {
+			this.word = word;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return "WC " + word + " " + count;
+		}
+	}
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+		DataSet<WC> input = env.fromElements(
+				new WC("Hello", 1),
+				new WC("Ciao", 1),
+				new WC("Hello", 1)
+		);
+
+		ExpressionOperation<JavaBatchTranslator> expr = new JavaBatchTranslator().createExpressionOperation(
+				input,
+				new Expression[] { UnresolvedFieldReference$.MODULE$.apply("count"), UnresolvedFieldReference$.MODULE$.apply("word")});
+
+		ExpressionOperation<JavaBatchTranslator> filtered = expr.filter(
+				EqualTo$.MODULE$.apply(UnresolvedFieldReference$.MODULE$.apply("word"), Literal$.MODULE$.apply("Hello")));
+
+		DataSet<WC> result = (DataSet<WC>) filtered.as(TypeExtractor.createTypeInfo(WC.class));
+
+		result.print();
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
new file mode 100644
index 0000000..34e400f
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions
+
+/**
+ * Exception for all errors occurring during expression operations.
+ */
+class ExpressionException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
new file mode 100644
index 0000000..d843c5b
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.expressions.analysis.{GroupByAnalyzer, SelectionAnalyzer,
+PredicateAnalyzer}
+import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
+UnresolvedFieldReference, Expression}
+
+/**
+ * The abstraction for writing expression API programs. Similar to how the batch and streaming APIs
+ * have [[org.apache.flink.api.scala.DataSet]] and
+ * [[org.apache.flink.streaming.api.scala.DataStream]].
+ *
+ * Use the methods of [[ExpressionOperation]] to transform data or to revert back to the underlying
+ * batch or streaming representation.
+ */
+case class ExpressionOperation[A <: OperationTranslator](
+    private[flink] val operation: Operation,
+    private[flink] val operationTranslator: A) {
+
+
+  /**
+   * Converts the result of this operation back to a [[org.apache.flink.api.scala.DataSet]] or
+   * [[org.apache.flink.streaming.api.scala.DataStream]].
+   */
+  def as[O](implicit tpe: TypeInformation[O]): operationTranslator.Representation[O] = {
+    operationTranslator.translate(operation)
+  }
+
+  /**
+   * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
+   * can contain complex expressions and aggregations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10))
+   * }}}
+   */
+  def select(fields: Expression*): ExpressionOperation[A] = {
+    val analyzer = new SelectionAnalyzer(operation.outputFields)
+    val analyzedFields = fields.map(analyzer.analyze)
+    val fieldNames = analyzedFields map(_.name)
+    if (fieldNames.toSet.size != fieldNames.size) {
+      throw new ExpressionException(s"Resulting fields names are not unique in expression" +
+        s""" "${fields.mkString(", ")}".""")
+    }
+    this.copy(operation = Select(operation, analyzedFields))
+  }
+
+  /**
+   * Renames the fields of the expression result. Use this to disambiguate fields before
+   * joining to operations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.as('a, 'b)
+   * }}}
+   */
+  def as(fields: Expression*): ExpressionOperation[A] = {
+    fields forall {
+      f => f.isInstanceOf[UnresolvedFieldReference]
+    } match {
+      case true =>
+      case false => throw new ExpressionException("Only field expression allowed in as().")
+    }
+    this.copy(operation = As(operation, fields.toArray map { _.name }))
+
+  }
+
+  /**
+   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter('name === "Fred")
+   * }}}
+   */
+  def filter(predicate: Expression): ExpressionOperation[A] = {
+    val analyzer = new PredicateAnalyzer(operation.outputFields)
+    val analyzedPredicate = analyzer.analyze(predicate)
+    this.copy(operation = Filter(operation, analyzedPredicate))
+  }
+
+  /**
+   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter('name === "Fred")
+   * }}}
+   */
+  def where(predicate: Expression): ExpressionOperation[A] = {
+    filter(predicate)
+  }
+
+  /**
+   * Groups the elements on some grouping keys. Use this before a selection with aggregations
+   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.groupBy('key).select('key, 'value.avg)
+   * }}}
+   */
+  def groupBy(fields: Expression*): ExpressionOperation[A] = {
+    val analyzer = new GroupByAnalyzer(operation.outputFields)
+    val analyzedFields = fields.map(analyzer.analyze)
+
+    val illegalKeys = analyzedFields filter {
+      case fe: ResolvedFieldReference => false // OK
+      case e => true
+    }
+
+    if (illegalKeys.nonEmpty) {
+      throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", "))
+    }
+
+    this.copy(operation = GroupBy(operation, analyzedFields))
+  }
+
+  /**
+   * Joins to expression operations. Similar to an SQL join. The fields of the two joined
+   * operations must not overlap, use [[as]] to rename fields if necessary.
+   *
+   * Example:
+   *
+   * {{{
+   *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
+   * }}}
+   */
+  def join(right: ExpressionOperation[A]): ExpressionOperation[A] = {
+    val leftInputNames = operation.outputFields.map(_._1).toSet
+    val rightInputNames = right.operation.outputFields.map(_._1).toSet
+    if (leftInputNames.intersect(rightInputNames).nonEmpty) {
+      throw new ExpressionException(
+        "Overlapping fields names on join input, result would be ambiguous: " +
+          operation.outputFields.mkString(", ") +
+          " and " +
+          right.operation.outputFields.mkString(", ") )
+    }
+    this.copy(operation = Join(operation, right.operation))
+  }
+
+  override def toString: String = s"Expression($operation)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
new file mode 100644
index 0000000..47ef59e
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions
+
+/**
+ * This is used for executing expression operations. We use manually generated
+ * TypeInfo to check the field types and create serializers and comparators.
+ */
+class Row(arity: Int) extends Product {
+
+  private val fields = new Array[Any](arity)
+
+  def productArity = fields.length
+
+  def productElement(i: Int): Any = fields(i)
+
+  def setField(i: Int, value: Any): Unit = fields(i) = value
+
+  def canEqual(that: Any) = false
+
+  override def toString = fields.mkString(",")
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
new file mode 100644
index 0000000..da71cdd
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.tree.Expression
+
+/**
+ * Base class for expression analyzers/transformers. Analyzers must implement method `rules` to
+ * provide the chain of rules that are invoked one after another. The expression resulting
+ * from one rule is fed into the next rule and the final result is returned from method `analyze`.
+ */
+abstract class Analyzer {
+
+  def rules: Seq[Rule]
+
+  final def analyze(expr: Expression): Expression = {
+    var currentTree = expr
+    for (rule <- rules) {
+      currentTree = rule(currentTree)
+    }
+    currentTree
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
new file mode 100644
index 0000000..a4f8f25
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.tree._
+import org.apache.flink.api.common.typeutils.CompositeType
+
+import scala.collection.mutable
+
+/**
+ * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified
+ * expression without the equi-join predicates together with indices of the join fields
+ * from both the left and right input.
+ */
+object ExtractEquiJoinFields {
+  def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = {
+
+    val joinFieldsLeft = mutable.MutableList[Int]()
+    val joinFieldsRight = mutable.MutableList[Int]()
+
+    val equiJoinExprs = mutable.MutableList[EqualTo]()
+    // First get all `===` expressions that are not below an `Or`
+    predicate.transformPre {
+      case or@Or(_, _) => NopExpression()
+      case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) =>
+        if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
+          joinFieldsLeft += leftType.getFieldIndex(le.name)
+          joinFieldsRight += rightType.getFieldIndex(re.name)
+        } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
+          joinFieldsLeft += leftType.getFieldIndex(re.name)
+          joinFieldsRight += rightType.getFieldIndex(le.name)
+        } else {
+          // not an equi-join predicate
+        }
+        equiJoinExprs += eq
+        eq
+    }
+
+    // then remove the equi join expressions from the predicate
+    val resultExpr = predicate.transformPost {
+      // For OR, we can eliminate the OR since the equi join
+      // predicate is evaluated before the expression is evaluated
+      case or@Or(NopExpression(), _) => NopExpression()
+      case or@Or(_, NopExpression()) => NopExpression()
+      // For AND we replace it with the other expression, since the
+      // equi join predicate will always be true
+      case and@And(NopExpression(), other) => other
+      case and@And(other, NopExpression()) => other
+      case eq : EqualTo if equiJoinExprs.contains(eq) =>
+        NopExpression()
+    }
+
+    (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
new file mode 100644
index 0000000..21f989c
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions._
+import org.apache.flink.api.expressions.tree.{ResolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.collection.mutable
+
+
+/**
+ * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions.
+ */
+class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
+
+  def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression)
+
+  object CheckGroupExpression extends Rule {
+
+    def apply(expr: Expression) = {
+      val errors = mutable.MutableList[String]()
+
+      expr match {
+        case f: ResolvedFieldReference => // this is OK
+        case other =>
+          throw new ExpressionException(
+            s"""Invalid grouping expression "$expr". Only field references are allowed.""")
+      }
+      expr
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
new file mode 100644
index 0000000..319e72f
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.tree._
+import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
+
+/**
+ * [[Rule]] that adds casts in arithmetic operations.
+ */
+class InsertAutoCasts extends Rule {
+
+  def apply(expr: Expression) = {
+    val result = expr.transformPost {
+
+      case plus@Plus(o1, o2) =>
+        // Plus is special case since we can cast anything to String for String concat
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            Plus(Cast(o1, o2.typeInfo), o2)
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            Plus(o1, Cast(o2, o1.typeInfo))
+          } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+            Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
+          } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+            Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
+          } else {
+            plus
+          }
+        } else {
+          plus
+        }
+
+      case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
+        ba.isInstanceOf[BinaryComparison] =>
+        val o1 = ba.left
+        val o2 = ba.right
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+          } else {
+            ba
+          }
+        } else {
+          ba
+        }
+
+      case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
+        val o1 = ba.left
+        val o2 = ba.right
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
+          o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+          } else {
+            ba
+          }
+        } else {
+          ba
+        }
+    }
+
+    result
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
new file mode 100644
index 0000000..2531fff
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+
+/**
+ * Analyzer for unary predicates, i.e. filter operations.
+ */
+class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
+  def rules = Seq(
+    new ResolveFieldReferences(inputFields),
+    new InsertAutoCasts,
+    new TypeCheck,
+    new VerifyBoolean)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
new file mode 100644
index 0000000..693dd88
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
+UnresolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.expressions._
+
+import scala.collection.mutable
+
+/**
+ * Rule that resolved field references. This rule verifies that field references point to existing
+ * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
+ * [[TypeInformation]] in addition to the field name.
+ */
+class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPost {
+      case fe@UnresolvedFieldReference(fieldName) =>
+        inputFields.find { _._1 == fieldName } match {
+          case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
+
+          case None =>
+            errors +=
+              s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}"
+            fe
+        }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
new file mode 100644
index 0000000..853ee7a
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.tree.Expression
+
+/**
+ * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets on
+ * [[Expression]] and must return an expression. The returned [[Expression]] can also be
+ * the input [[Expression]]. In an [[Analyzer]] rule chain the result [[Expression]] of one
+ * [[Rule]] is fed into the next [[Rule]] in the chain.
+ */
+abstract class Rule {
+  def apply(expr: Expression): Expression
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
new file mode 100644
index 0000000..eca007f
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+ * This analyzes selection expressions.
+ */
+class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
+
+  def rules = Seq(
+    new ResolveFieldReferences(inputFields),
+    new VerifyNoNestedAggregates,
+    new InsertAutoCasts,
+    new TypeCheck)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
new file mode 100644
index 0000000..632daa3
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.expressions.{_}
+
+import scala.collection.mutable
+
+/**
+ * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once.
+ * Expressions are expected to perform type verification in this method.
+ */
+class TypeCheck extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case expr: Expression=> {
+        // simply get the typeInfo from the expression. this will perform type analysis
+        try {
+          expr.typeInfo
+        } catch {
+          case e: ExpressionException =>
+            errors += e.getMessage
+        }
+        expr
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
new file mode 100644
index 0000000..d0bd6b6
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.tree.{NopExpression, Expression}
+import org.apache.flink.api.expressions.{_}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+import scala.collection.mutable
+
+/**
+ * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required
+ * for filter/join predicates.
+ */
+class VerifyBoolean extends Rule {
+
+  def apply(expr: Expression) = {
+    if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.")
+    }
+
+    expr
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
new file mode 100644
index 0000000..de5063a
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.expressions.tree.{Expression, Aggregation}
+
+import scala.collection.mutable
+
+/**
+ * Rule that verifies that an expression does not contain aggregate operations
+ * as children of aggregate operations.
+ */
+class VerifyNoNestedAggregates extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        if (agg.child.exists(_.isInstanceOf[Aggregation])) {
+          errors += s"""Found nested aggregation inside "$agg"."""
+        }
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}


Mime
View raw message