flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/6] flink git commit: [FLINK-947] Add a declarative Expression API
Date Mon, 23 Feb 2015 15:05:29 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala
new file mode 100644
index 0000000..3e1ce0c
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.codegen
+
+import java.util.concurrent.atomic.AtomicInteger
+import org.apache.flink.api.expressions.tree._
+import org.apache.flink.api.expressions.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo}
+import org.apache.flink.api.expressions.{ExpressionException, tree}
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, BasicTypeInfo, TypeInformation}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Base class for all code generation classes. This provides the functionality for generating
+  * code from an [[Expression]] tree. Derived classes must embed this in a lambda function
+  * to form an executable code block.
+  *
+  * @param inputs List of input variable names with corresponding [[TypeInformation]].
+  * @param nullCheck Whether the generated code should include checks for NULL values.
+  * @param cl The ClassLoader that is used to create the Scala reflection ToolBox
+  * @tparam R The type of the generated code block. In most cases a lambda function such
+  *           as "(IN1, IN2) => OUT".
+  */
+abstract class ExpressionCodeGenerator[R](
+    inputs: Seq[(String, CompositeType[_])],
+    val nullCheck: Boolean = false,
+    cl: ClassLoader) {
+  protected val log = LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]])
+
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  if (cl == null) {
+    throw new IllegalArgumentException("ClassLoader must not be null.")
+  }
+
+  import scala.tools.reflect.ToolBox
+  protected val (mirror, toolBox) = ReflectionLock.synchronized {
+    val mirror = runtimeMirror(cl)
+    (mirror, mirror.mkToolBox())
+  }
+
+  // This is to be implemented by subclasses, we have it like this
+  // so that we only call it from here with the Scala Reflection Lock.
+  protected def generateInternal(): R
+
+  final def generate(): R = {
+    ReflectionLock.synchronized {
+      generateInternal()
+    }
+  }
+
+  val cache = mutable.HashMap[Expression, GeneratedExpression]()
+
+  protected def generateExpression(expr: Expression): GeneratedExpression = {
+    // doesn't work yet, because we insert the same code twice and reuse variable names
+//    cache.getOrElseUpdate(expr, generateExpressionInternal(expr))
+    generateExpressionInternal(expr)
+  }
+
+  protected def generateExpressionInternal(expr: Expression): GeneratedExpression = {
+//  protected def generateExpression(expr: Expression): GeneratedExpression = {
+    val nullTerm = freshTermName("isNull")
+    val resultTerm = freshTermName("result")
+
+    // For binary predicates that must only be evaluated when both operands are non-null.
+    // This will write to nullTerm and resultTerm, so don't use those term names
+    // after using this function
+    def generateIfNonNull(left: Expression, right: Expression, resultType: TypeInformation[_])
+                         (expr: (TermName, TermName) => Tree): Seq[Tree] = {
+      val leftCode = generateExpression(left)
+      val rightCode = generateExpression(right)
+
+
+      if (nullCheck) {
+        leftCode.code ++ rightCode.code ++ q"""
+        val $nullTerm = ${leftCode.nullTerm} || ${rightCode.nullTerm}
+        val $resultTerm = if ($nullTerm) {
+          ${defaultPrimitive(resultType)}
+        } else {
+          ${expr(leftCode.resultTerm, rightCode.resultTerm)}
+        }
+        """.children
+      } else {
+        leftCode.code ++ rightCode.code :+ q"""
+        val $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}
+        """
+      }
+    }
+
+    val cleanedExpr = expr match {
+      case tree.Naming(namedExpr, _) => namedExpr
+      case _ => expr
+    }
+
+    val code: Seq[Tree] = cleanedExpr match {
+
+      case tree.Literal(null, typeInfo) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = true
+            val resultTerm = null
+          """.children
+        } else {
+          Seq(q"""
+            val resultTerm = null
+          """)
+        }
+
+      case tree.Literal(intValue: Int, INT_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $intValue
+          """.children
+        } else {
+          Seq(q"""
+            val $resultTerm = $intValue
+          """)
+        }
+
+      case tree.Literal(longValue: Long, LONG_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $longValue
+          """.children
+        } else {
+          Seq(q"""
+            val $resultTerm = $longValue
+          """)
+        }
+
+
+      case tree.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $doubleValue
+          """.children
+        } else {
+          Seq(q"""
+              val $resultTerm = $doubleValue
+          """)
+        }
+
+      case tree.Literal(floatValue: Float, FLOAT_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $floatValue
+          """.children
+        } else {
+          Seq(q"""
+              val $resultTerm = $floatValue
+          """)
+        }
+
+      case tree.Literal(strValue: String, STRING_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $strValue
+          """.children
+        } else {
+          Seq(q"""
+              val $resultTerm = $strValue
+          """)
+        }
+
+      case tree.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $boolValue
+          """.children
+        } else {
+          Seq(q"""
+              val $resultTerm = $boolValue
+          """)
+        }
+
+      case Substring(str, beginIndex, endIndex) =>
+        val strCode = generateExpression(str)
+        val beginIndexCode = generateExpression(beginIndex)
+        val endIndexCode = generateExpression(endIndex)
+        if (nullCheck) {
+          strCode.code ++ beginIndexCode.code ++ endIndexCode.code ++ q"""
+            val $nullTerm =
+              ${strCode.nullTerm} || ${beginIndexCode.nullTerm} || ${endIndexCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(str.typeInfo)}
+            } else {
+              val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) {
+                 (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
+              } else {
+                (${strCode.resultTerm}).substring(
+                  ${beginIndexCode.resultTerm},
+                  ${endIndexCode.resultTerm})
+              }
+            }
+          """.children
+        } else {
+          strCode.code ++ beginIndexCode.code ++ endIndexCode.code :+ q"""
+            val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) {
+              (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
+            } else {
+              (${strCode.resultTerm}).substring(
+                ${beginIndexCode.resultTerm},
+                ${endIndexCode.resultTerm})
+            }
+          """
+        }
+
+      case tree.Cast(child: Expression, STRING_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = if ($nullTerm == null) {
+              null
+            } else {
+              ${childGen.resultTerm}.toString
+            }
+          """.children
+        } else {
+          Seq(q"""
+            val $resultTerm = ${childGen.resultTerm}.toString
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, INT_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toInt
+          """.children
+        } else {
+          Seq(q"""
+            val $resultTerm = ${childGen.resultTerm}.toInt
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, LONG_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toLong
+          """.children
+        } else {
+          Seq(q"""
+            val $resultTerm = ${childGen.resultTerm}.toLong
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, FLOAT_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toFloat
+          """.children
+        } else {
+          Seq(q"""
+            val $resultTerm = ${childGen.resultTerm}.toFloat
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, DOUBLE_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toDouble
+          """.children
+        } else {
+          Seq(q"""
+            val $resultTerm = ${childGen.resultTerm}.toDouble
+          """)
+        }
+        childGen.code ++ castCode
+
+      case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) =>
+        inputs find { i => i._2.hasField(fieldName) } match {
+          case Some((inputName, inputTpe)) =>
+            val fieldCode = getField(newTermName(inputName), inputTpe, fieldName, fieldTpe)
+            if (nullCheck) {
+              q"""
+                val $resultTerm = $fieldCode
+                val $nullTerm = $resultTerm == null
+              """.children
+            } else {
+              Seq(q"""
+                val $resultTerm = $fieldCode
+              """)
+            }
+
+          case None => throw new ExpressionException("Could not get accessor for " + fieldName
+            + " in inputs " + inputs.mkString(", ") + ".")
+        }
+
+      case GreaterThan(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm > $rightTerm"
+        }
+
+      case GreaterThanOrEqual(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm >= $rightTerm"
+        }
+
+      case LessThan(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm < $rightTerm"
+        }
+
+      case LessThanOrEqual(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm <= $rightTerm"
+        }
+
+      case EqualTo(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm == $rightTerm"
+        }
+
+      case NotEqualTo(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm != $rightTerm"
+        }
+
+      case And(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm && $rightTerm"
+        }
+
+      case Or(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm || $rightTerm"
+        }
+
+      case Plus(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm + $rightTerm"
+        }
+
+      case Minus(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm - $rightTerm"
+        }
+
+      case Div(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm / $rightTerm"
+        }
+
+      case Mul(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm * $rightTerm"
+        }
+
+      case Mod(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm % $rightTerm"
+        }
+
+      case UnaryMinus(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = -(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = -(${childCode.resultTerm})
+          """
+        }
+
+      case BitwiseAnd(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm & $rightTerm"
+        }
+
+      case BitwiseOr(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm | $rightTerm"
+        }
+
+      case BitwiseXor(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm ^ $rightTerm"
+        }
+
+      case BitwiseNot(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = ~(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = ~(${childCode.resultTerm})
+          """
+        }
+
+      case Not(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = !(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = !(${childCode.resultTerm})
+          """
+        }
+
+      case IsNull(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = (${childCode.resultTerm}) == null
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = (${childCode.resultTerm}) == null
+          """
+        }
+
+      case IsNotNull(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = (${childCode.resultTerm}) != null
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = (${childCode.resultTerm}) != null
+          """
+        }
+
+      case Abs(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = Math.abs(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = Math.abs(${childCode.resultTerm})
+          """
+        }
+
+      case _ => throw new ExpressionException("Could not generate code for expression " + expr)
+    }
+
+    GeneratedExpression(code, resultTerm, nullTerm)
+  }
+
+  case class GeneratedExpression(code: Seq[Tree], resultTerm: TermName, nullTerm: TermName)
+
+  // We don't have c.freshName
+  // According to http://docs.scala-lang.org/overviews/quasiquotes/hygiene.html
+  // it's coming for 2.11. We can't wait that long...
+  def freshTermName(name: String): TermName = {
+    newTermName(s"$name$$${freshNameCounter.getAndIncrement}")
+  }
+
+  val freshNameCounter = new AtomicInteger
+
+  protected def getField(
+      inputTerm: TermName,
+      inputType: CompositeType[_],
+      fieldName: String,
+      fieldType: TypeInformation[_]): Tree = {
+    val accessor = fieldAccessorFor(inputType, fieldName)
+    accessor match {
+      case ObjectFieldAccessor(fieldName) =>
+        val fieldTerm = newTermName(fieldName)
+        q"$inputTerm.$fieldTerm.asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+
+      case ObjectMethodAccessor(methodName) =>
+        val methodTerm = newTermName(methodName)
+        q"$inputTerm.$methodTerm().asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+
+      case ProductAccessor(i) =>
+        q"$inputTerm.productElement($i).asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+
+    }
+  }
+
+  sealed abstract class FieldAccessor
+
+  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
+  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
+  case class ProductAccessor(i: Int) extends FieldAccessor
+
+  def fieldAccessorFor(elementType: CompositeType[_], fieldName: String): FieldAccessor = {
+    elementType match {
+      case ri: RowTypeInfo =>
+        ProductAccessor(elementType.getFieldIndex(fieldName))
+
+      case cc: CaseClassTypeInfo[_] =>
+        ObjectFieldAccessor(fieldName)
+
+      case javaTup: TupleTypeInfo[_] =>
+        ObjectFieldAccessor(fieldName)
+
+      case pj: PojoTypeInfo[_] =>
+        ObjectFieldAccessor(fieldName)
+
+      case proxy: RenamingProxyTypeInfo[_] =>
+        val underlying = proxy.getUnderlyingType
+        val fieldIndex = proxy.getFieldIndex(fieldName)
+        fieldAccessorFor(underlying, underlying.getFieldNames()(fieldIndex))
+    }
+  }
+
+  protected def defaultPrimitive(tpe: TypeInformation[_]) = tpe match {
+    case BasicTypeInfo.INT_TYPE_INFO => ru.Literal(Constant(-1))
+    case BasicTypeInfo.LONG_TYPE_INFO => ru.Literal(Constant(1L))
+    case BasicTypeInfo.SHORT_TYPE_INFO => ru.Literal(Constant(-1.toShort))
+    case BasicTypeInfo.BYTE_TYPE_INFO => ru.Literal(Constant(-1.toByte))
+    case BasicTypeInfo.FLOAT_TYPE_INFO => ru.Literal(Constant(-1.0.toFloat))
+    case BasicTypeInfo.DOUBLE_TYPE_INFO => ru.Literal(Constant(-1.toDouble))
+    case BasicTypeInfo.BOOLEAN_TYPE_INFO => ru.Literal(Constant(false))
+    case BasicTypeInfo.STRING_TYPE_INFO => ru.Literal(Constant("<empty>"))
+    case BasicTypeInfo.CHAR_TYPE_INFO => ru.Literal(Constant('\0'))
+    case _ => ru.Literal(Constant(null))
+  }
+
+  protected def typeTermForTypeInfo(typeInfo: TypeInformation[_]): Tree = {
+    val tpe = typeForTypeInfo(typeInfo)
+    tq"$tpe"
+  }
+
+  // We need two separate methods here because typeForTypeInfo is recursive when generating
+  // the type for a type with generic parameters.
+  protected def typeForTypeInfo(tpe: TypeInformation[_]): Type = tpe match {
+
+    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
+    // does not seem to like this, so we manually give the correct type here.
+    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Int]]
+    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Long]]
+    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Short]]
+    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Byte]]
+    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Float]]
+    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Double]]
+    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Boolean]]
+    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Char]]
+
+    case _ =>
+      val clazz = mirror.staticClass(tpe.getTypeClass.getCanonicalName)
+
+      clazz.selfType.erasure match {
+        case ExistentialType(_, underlying) => underlying
+
+        case tpe@TypeRef(prefix, sym, Nil) =>
+          // Non-generic type, just return the type
+          tpe
+
+        case TypeRef(prefix, sym, emptyParams) =>
+          val genericTypeInfos = tpe.getGenericParameters.asScala
+          if (emptyParams.length != genericTypeInfos.length) {
+            throw new RuntimeException("Number of type parameters does not match.")
+          }
+          val typeParams = genericTypeInfos.map(typeForTypeInfo)
+          TypeRef(prefix, sym, typeParams.toList)
+      }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala
new file mode 100644
index 0000000..ce80469
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.codegen
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for binary predicates, i.e. a Join or CoGroup Predicate.
+ */
+class GenerateBinaryPredicate[L, R](
+    leftType: CompositeType[L],
+    rightType: CompositeType[R],
+    predicate: Expression,
+    cl: ClassLoader)
+  extends ExpressionCodeGenerator[(L, R) => Boolean](
+    Seq(("input0", leftType), ("input1", rightType)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  override protected def generateInternal(): ((L, R) => Boolean) = {
+    val pred = generateExpression(predicate)
+
+    val in0 = newTermName("input0")
+    val in1 = newTermName("input1")
+
+    val leftTpe = typeTermForTypeInfo(leftType)
+    val rightTpe = typeTermForTypeInfo(rightType)
+
+    val code = if (nullCheck) {
+      q"""
+        ($in0: $leftTpe, $in1: $rightTpe) => {
+          ..${pred.code}
+          if (${pred.nullTerm}) {
+            false
+          } else {
+            ${pred.resultTerm}
+          }
+        }
+      """
+    } else {
+      q"""
+        ($in0: $leftTpe, $in1: $rightTpe) => {
+          ..${pred.code}
+          ${pred.resultTerm}
+        }
+      """
+    }
+
+    LOG.debug(s"""Generated binary predicate "$predicate":\n$code""")
+    toolBox.eval(code).asInstanceOf[(L, R) => Boolean]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala
new file mode 100644
index 0000000..4066831
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.codegen
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for assembling the result of a binary operation.
+ */
+class GenerateBinaryResultAssembler[L, R, O](
+    leftTypeInfo: CompositeType[L],
+    rightTypeInfo: CompositeType[R],
+    resultTypeInfo: CompositeType[O],
+    outputFields: Seq[Expression],
+    cl: ClassLoader)
+  extends GenerateResultAssembler[(L, R, O) => O](
+    Seq(("input0", leftTypeInfo), ("input1", rightTypeInfo)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.universe._
+
+
+  override protected def generateInternal(): ((L, R, O) => O) = {
+
+    val leftType = typeTermForTypeInfo(leftTypeInfo)
+    val rightType = typeTermForTypeInfo(rightTypeInfo)
+    val resultType = typeTermForTypeInfo(resultTypeInfo)
+
+    val resultCode = createResult(resultTypeInfo, outputFields)
+
+    val code: Tree =
+      q"""
+        (input0: $leftType, input1: $rightType, out: $resultType) => {
+          ..$resultCode
+        }
+      """
+
+    LOG.debug(s"Generated binary result-assembler:\n$code")
+    toolBox.eval(code).asInstanceOf[(L, R, O) => O]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala
new file mode 100644
index 0000000..977126d
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.codegen
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.expressions.typeinfo.RowTypeInfo
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+
+/**
+ * Base class for unary and binary result assembler code generators.
+ */
+abstract class GenerateResultAssembler[R](
+    inputs: Seq[(String, CompositeType[_])],
+    cl: ClassLoader)
+  extends ExpressionCodeGenerator[R](inputs, cl = cl) {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  def createResult[T](
+      resultTypeInfo: CompositeType[T],
+      outputFields: Seq[Expression]): Tree = {
+
+    val resultType = typeTermForTypeInfo(resultTypeInfo)
+
+    val fieldsCode = outputFields.map(generateExpression)
+
+    val block = resultTypeInfo match {
+      case ri: RowTypeInfo =>
+        val resultSetters: Seq[Tree] = fieldsCode.zipWithIndex map {
+          case (fieldCode, i) =>
+            q"""
+              out.setField($i, { ..${fieldCode.code}; ${fieldCode.resultTerm} })
+            """
+        }
+
+        q"""
+          ..$resultSetters
+          out
+        """
+
+      case pj: PojoTypeInfo[_] =>
+        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
+        case (fieldCode, expr) =>
+          val fieldName = newTermName(expr.name)
+          q"""
+              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
+            """
+        }
+
+        q"""
+          ..$resultSetters
+          out
+        """
+
+      case tup: TupleTypeInfo[_] =>
+        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
+          case (fieldCode, expr) =>
+            val fieldName = newTermName(expr.name)
+            q"""
+              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
+            """
+        }
+
+        q"""
+          ..$resultSetters
+          out
+        """
+
+      case cc: CaseClassTypeInfo[_] =>
+        val resultFields: Seq[Tree] = fieldsCode map {
+          fieldCode =>
+            q"{ ..${fieldCode.code}; ${fieldCode.resultTerm}}"
+        }
+        q"""
+          new $resultType(..$resultFields)
+        """
+    }
+
+    block
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala
new file mode 100644
index 0000000..cd8edc4
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.codegen
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for a unary predicate, i.e. a Filter.
+ */
+class GenerateUnaryPredicate[T](
+    inputType: CompositeType[T],
+    predicate: Expression,
+    cl: ClassLoader) extends ExpressionCodeGenerator[T => Boolean](
+      Seq(("input0", inputType)),
+      cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  override protected def generateInternal(): (T => Boolean) = {
+    val pred = generateExpression(predicate)
+
+    val tpe = typeTermForTypeInfo(inputType)
+
+    val code = if (nullCheck) {
+      q"""
+        (input0: $tpe) => {
+          ..${pred.code}
+          if (${pred.nullTerm}) {
+            false
+          } else {
+            ${pred.resultTerm}
+          }
+        }
+      """
+    } else {
+      q"""
+        (input0: $tpe) => {
+          ..${pred.code}
+          ${pred.resultTerm}
+        }
+      """
+    }
+
+    LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
+    toolBox.eval(code).asInstanceOf[(T) => Boolean]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala
new file mode 100644
index 0000000..49970c1
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.codegen
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for assembling the result of a unary operation.
+ */
+class GenerateUnaryResultAssembler[I, O](
+    inputTypeInfo: CompositeType[I],
+    resultTypeInfo: CompositeType[O],
+    outputFields: Seq[Expression],
+    cl: ClassLoader)
+  extends GenerateResultAssembler[(I, O) => O](
+    Seq(("input0", inputTypeInfo)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.universe._
+
+  override protected def generateInternal(): ((I, O) => O) = {
+
+    val inputType = typeTermForTypeInfo(inputTypeInfo)
+    val resultType = typeTermForTypeInfo(resultTypeInfo)
+
+    val resultCode = createResult(resultTypeInfo, outputFields)
+
+    val code: Tree =
+      q"""
+        (input0: $inputType, out: $resultType) => {
+          ..$resultCode
+        }
+      """
+
+    LOG.debug(s"Generated unary result-assembler:\n${show(code)}")
+    toolBox.eval(code).asInstanceOf[(I, O) => O]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala
new file mode 100644
index 0000000..0b7c95d
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions
+
+package object codegen {
+  // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might
+  // have several parallel expression operators in one TaskManager, therefore we need to guard
+  // these operations.
+  object ReflectionLock
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala
new file mode 100644
index 0000000..723483c
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.operations
+
+import org.apache.flink.api.expressions.analysis.SelectionAnalyzer
+import org.apache.flink.api.expressions.tree._
+import org.apache.flink.api.java.aggregation.Aggregations
+
+import scala.collection.mutable
+
+/**
+ * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]]
+ * without aggregations it is simply returned.
+ *
+ * This select:
+ * {{{
+ *   in.select('key, 'value.avg)
+ * }}}
+ *
+ * is transformed to this expansion:
+ * {{{
+ *   in
+ *     .select('key, 'value, Literal(1) as 'intermediate.1)
+ *     .aggregate('value.sum, 'intermediate.1.sum)
+ *     .select('key, 'value / 'intermediate.1)
+ * }}}
+ *
+ * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation.
+ */
+object ExpandAggregations {
+  def apply(select: Select): Operation = select match {
+    case Select(input, selection) =>
+
+      val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
+      val intermediateFields = mutable.HashSet[Expression]()
+      val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]()
+
+      var intermediateCount = 0
+      selection foreach {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map {
+              case (expr, basicAgg) =>
+                aggregations.get((expr, basicAgg)) match {
+                  case Some(intermediateName) =>
+                    ResolvedFieldReference(intermediateName, expr.typeInfo)
+                  case None =>
+                    intermediateCount = intermediateCount + 1
+                    val intermediateName = s"intermediate.$intermediateCount"
+                    intermediateFields += Naming(expr, intermediateName)
+                    aggregations((expr, basicAgg)) = intermediateName
+                    ResolvedFieldReference(intermediateName, expr.typeInfo)
+                }
+            }
+
+            aggregationIntermediates(agg) = intermediateReferences
+            // Return a NOP so that we don't add the children of the aggregation
+            // to intermediate fields. We already added the necessary fields to the list
+            // of intermediate fields.
+            NopExpression()
+
+          case fa: ResolvedFieldReference =>
+            if (!fa.name.startsWith("intermediate")) {
+              intermediateFields += Naming(fa, fa.name)
+            }
+            fa
+        }
+      }
+
+      if (aggregations.isEmpty) {
+        // no aggregations, just return
+        return select
+      }
+
+      // also add the grouping keys to the set of intermediate fields, because we use a Set,
+      // they are only added when not already present
+      input match {
+        case GroupBy(_, groupingFields) =>
+          groupingFields foreach {
+            case fa: ResolvedFieldReference =>
+              intermediateFields += Naming(fa, fa.name)
+          }
+        case _ => // Nothing to add
+      }
+
+      val basicAggregations = aggregations.map {
+        case ((expr, basicAgg), fieldName) =>
+          (fieldName, basicAgg)
+      }
+
+      val finalFields = selection.map {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediates = aggregationIntermediates(agg)
+            agg.getFinalField(intermediates)
+        }
+      }
+
+      val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
+      val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
+
+      val finalAnalyzer =
+        new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo)))
+      val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
+
+      val result = input match {
+        case GroupBy(groupByInput, groupingFields) =>
+          Select(
+            Aggregate(
+              GroupBy(
+                Select(groupByInput, analyzedIntermediates),
+                groupingFields),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+        case _ =>
+          Select(
+            Aggregate(
+              Select(input, analyzedIntermediates),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+      }
+
+      result
+
+    case _ => select
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala
new file mode 100644
index 0000000..12e4793
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.operations
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+ * When an [[org.apache.flink.api.expressions.ExpressionOperation]] is created an
+ * [[OperationTranslator]] corresponding to the underlying representation (either
+ * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]] is
+ * created. This way, the expression API can be completely agnostic while translation back to the
+ * correct API is handled by the API specific translator.
+ */
+abstract class OperationTranslator {
+
+  type Representation[A]
+
+  def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): Representation[A]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala
new file mode 100644
index 0000000..d036631
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.operations
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * Base class for all expression operations.
+ */
+sealed abstract class Operation {
+  def outputFields: Seq[(String, TypeInformation[_])]
+}
+
+/**
+ * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
+ * [[org.apache.flink.streaming.api.scala.DataStream]] into an expression operation.
+ */
+case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends Operation
+
+/**
+ * Operation that joins two expression operations. A "filter" and a "select" should be applied
+ * after a join operation.
+ */
+case class Join(left: Operation, right: Operation) extends Operation {
+  def outputFields = left.outputFields ++ right.outputFields
+
+  override def toString = s"Join($left, $right)"
+}
+
+/**
+ * Operation that filters out elements that do not match the predicate expression.
+ */
+case class Filter(input: Operation, predicate: Expression) extends Operation {
+  def outputFields = input.outputFields
+
+  override def toString = s"Filter($input, $predicate)"
+}
+
+/**
+ * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields
+ * and perform arithmetic or logic operations. The expressions can also perform aggregates
+ * on fields.
+ */
+case class Select(input: Operation, selection: Seq[Expression]) extends Operation {
+  def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
+
+  override def toString = s"Select($input, ${selection.mkString(",")})"
+}
+
+/**
+ * Operation that gives new names to fields. Use this to disambiguate fields before a join
+ * operation.
+ */
+case class As(input: Operation, names: Seq[String]) extends Operation {
+  val outputFields = input.outputFields.zip(names) map {
+    case ((_, tpe), newName) => (newName, tpe)
+  }
+
+  override def toString = s"As($input, ${names.mkString(",")})"
+}
+
+/**
+ * Grouping operation. Keys are specified using field references. A group by operation os only
+ * useful when performing a select with aggregates afterwards.
+ * @param input
+ * @param fields
+ */
+case class GroupBy(input: Operation, fields: Seq[Expression]) extends Operation {
+  def outputFields = input.outputFields
+
+  override def toString = s"GroupBy($input, ${fields.mkString(",")})"
+}
+
+/**
+ * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]]
+ * and a simple [[Select]].
+ */
+case class Aggregate(
+    input: Operation,
+    aggregations: Seq[(String, Aggregations)]) extends Operation {
+  def outputFields = input.outputFields
+
+  override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala
new file mode 100644
index 0000000..2aa80b3
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions
+
+/**
+ * The operations in this package are created by calling methods on [[ExpressionOperation]] they
+ * should not be manually created by users of the API.
+ */
+package object operations

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala
new file mode 100644
index 0000000..5a2b87d
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.runtime
+
+import org.apache.flink.api.expressions.Row
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
+import org.apache.flink.api.java.aggregation.AggregationFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+@Combinable
+class ExpressionAggregateFunction(
+    private val fieldPositions: Seq[Int],
+    private val functions: Seq[AggregationFunction[Any]])
+  extends RichGroupReduceFunction[Row, Row] {
+
+  override def open(conf: Configuration): Unit = {
+    var i = 0
+    val len = functions.length
+    while (i < len) {
+      functions(i).initializeAggregate()
+      i += 1
+    }
+  }
+
+  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = {
+
+    val fieldPositions = this.fieldPositions
+    val functions = this.functions
+
+    var current: Row = null
+
+    val values = in.iterator()
+    while (values.hasNext) {
+      current = values.next()
+
+      var i = 0
+      val len = functions.length
+      while (i < len) {
+        functions(i).aggregate(current.productElement(fieldPositions(i)))
+        i += 1
+      }
+    }
+
+    var i = 0
+    val len = functions.length
+    while (i < len) {
+      current.setField(fieldPositions(i), functions(i).getAggregate)
+      functions(i).initializeAggregate()
+      i += 1
+    }
+
+    out.collect(current)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala
new file mode 100644
index 0000000..d766486
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.runtime
+
+import org.apache.flink.api.expressions.codegen.GenerateUnaryPredicate
+import org.apache.flink.api.expressions.tree.{NopExpression, Expression}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.configuration.Configuration
+
+class ExpressionFilterFunction[T](
+    predicate: Expression,
+    inputType: CompositeType[T]) extends RichFilterFunction[T] {
+
+  var compiledPredicate: (T) => Boolean = null
+
+  override def open(config: Configuration): Unit = {
+    if (compiledPredicate == null) {
+      compiledPredicate = predicate match {
+        case n: NopExpression => null
+        case _ =>
+          val codegen = new GenerateUnaryPredicate[T](
+            inputType,
+            predicate,
+            getRuntimeContext.getUserCodeClassLoader)
+          codegen.generate()
+      }
+    }
+  }
+
+  override def filter(in: T) = compiledPredicate(in)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala
new file mode 100644
index 0000000..46715cf
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.runtime
+
+import org.apache.flink.api.expressions.tree.{NopExpression, Expression}
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.expressions.codegen.{GenerateBinaryResultAssembler,
+GenerateBinaryPredicate}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+class ExpressionJoinFunction[L, R, O](
+    predicate: Expression,
+    leftType: CompositeType[L],
+    rightType: CompositeType[R],
+    resultType: CompositeType[O],
+    outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] {
+
+  var compiledPredicate: (L, R) => Boolean = null
+  var resultAssembler: (L, R, O) => O = null
+  var result: O = null.asInstanceOf[O]
+
+  override def open(config: Configuration): Unit = {
+    result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
+    if (compiledPredicate == null) {
+      compiledPredicate = predicate match {
+        case n: NopExpression => null
+        case _ =>
+          val codegen = new GenerateBinaryPredicate[L, R](
+            leftType,
+            rightType,
+            predicate,
+            getRuntimeContext.getUserCodeClassLoader)
+          codegen.generate()
+      }
+    }
+
+    if (resultAssembler == null) {
+      val resultCodegen = new GenerateBinaryResultAssembler[L, R, O](
+        leftType,
+        rightType,
+        resultType,
+        outputFields,
+        getRuntimeContext.getUserCodeClassLoader)
+
+      resultAssembler = resultCodegen.generate()
+    }
+  }
+
+  def join(left: L, right: R, out: Collector[O]) = {
+    if (compiledPredicate == null) {
+      result = resultAssembler(left, right, result)
+      out.collect(result)
+    } else {
+      if (compiledPredicate(left, right)) {
+        result = resultAssembler(left, right, result)
+        out.collect(result)      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala
new file mode 100644
index 0000000..5cdd8b2
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.runtime
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.expressions.codegen.GenerateUnaryResultAssembler
+import org.apache.flink.configuration.Configuration
+
+class ExpressionSelectFunction[I, O](
+     inputType: CompositeType[I],
+     resultType: CompositeType[O],
+     outputFields: Seq[Expression]) extends RichMapFunction[I, O] {
+
+  var resultAssembler: (I, O) => O = null
+  var result: O = null.asInstanceOf[O]
+
+  override def open(config: Configuration): Unit = {
+    result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
+
+    if (resultAssembler == null) {
+      val resultCodegen = new GenerateUnaryResultAssembler[I, O](
+        inputType,
+        resultType,
+        outputFields,
+        getRuntimeContext.getUserCodeClassLoader)
+
+      resultAssembler = resultCodegen.generate()
+    }
+  }
+
+  def map(in: I): O = {
+    resultAssembler(in, result)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala
new file mode 100644
index 0000000..0a3d683
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions
+
+/**
+ * The functions in this package are used to translate expression operations to Java API operations.
+ */
+package object runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala
new file mode 100644
index 0000000..8264705
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
+
+import scala.language.postfixOps
+
+
+abstract class Expression extends Product {
+  def children: Seq[Expression]
+  def name: String = Expression.freshName("expression")
+  def typeInfo: TypeInformation[_]
+
+  /**
+   * Tests for equality by first testing for reference equality.
+   */
+  def fastEquals(other: Expression): Boolean = this.eq(other) || this == other
+
+  def transformPre(rule: PartialFunction[Expression, Expression]): Expression = {
+    val afterTransform = rule.applyOrElse(this, identity[Expression])
+
+    if (afterTransform fastEquals this) {
+      this.transformChildrenPre(rule)
+    } else {
+      afterTransform.transformChildrenPre(rule)
+    }
+  }
+
+  def transformChildrenPre(rule: PartialFunction[Expression, Expression]): Expression = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: Expression if children.contains(child) =>
+        val newChild = child.transformPre(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def transformPost(rule: PartialFunction[Expression, Expression]): Expression = {
+    val afterChildren = transformChildrenPost(rule)
+    if (afterChildren fastEquals this) {
+      rule.applyOrElse(this, identity[Expression])
+    } else {
+      rule.applyOrElse(afterChildren, identity[Expression])
+    }
+  }
+
+  def transformChildrenPost(rule: PartialFunction[Expression, Expression]): Expression = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: Expression if children.contains(child) =>
+        val newChild = child.transformPost(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+    // toArray forces evaluation, toSeq does not seem to work here
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def exists(predicate: Expression => Boolean): Boolean = {
+    var exists = false
+    this.transformPre {
+      case e: Expression => if (predicate(e)) {
+        exists = true
+      }
+        e
+    }
+    exists
+  }
+
+  /**
+   * Creates a new copy of this expression with new children. This is used during transformation
+   * if children change. This must be overridden by Expressions that don't have the Constructor
+   * arguments in the same order as the `children`.
+   */
+  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
+    val defaultCtor =
+      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
+    try {
+      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
+    } catch {
+      case iae: IllegalArgumentException =>
+        println("IAE " + this)
+        throw new RuntimeException("Should never happen.")
+    }
+  }
+}
+
+abstract class BinaryExpression() extends Expression {
+  def left: Expression
+  def right: Expression
+  def children = Seq(left, right)
+}
+
+abstract class UnaryExpression() extends Expression {
+  def child: Expression
+  def children = Seq(child)
+}
+
+abstract class LeafExpression() extends Expression {
+  val children = Nil
+}
+
+case class NopExpression() extends LeafExpression {
+  val typeInfo = new NothingTypeInfo()
+  override val name = Expression.freshName("nop")
+
+}
+
+object Expression {
+  def freshName(prefix: String): String = {
+    s"$prefix-${freshNameCounter.getAndIncrement}"
+  }
+
+  val freshNameCounter = new AtomicInteger
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala
new file mode 100644
index 0000000..d3b19fa
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.aggregation.Aggregations
+
+
+abstract sealed class Aggregation extends UnaryExpression {
+  def typeInfo = {
+    child.typeInfo match {
+      case BasicTypeInfo.LONG_TYPE_INFO => // ok
+      case BasicTypeInfo.INT_TYPE_INFO =>
+      case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+      case BasicTypeInfo.FLOAT_TYPE_INFO =>
+      case BasicTypeInfo.BYTE_TYPE_INFO =>
+      case BasicTypeInfo.SHORT_TYPE_INFO =>
+      case _ =>
+      throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " +
+        s"aggregation $this. Only numeric data types supported.")
+    }
+    child.typeInfo
+  }
+
+  override def toString = s"Aggregate($child)"
+
+  def getIntermediateFields: Seq[Expression]
+  def getFinalField(inputs: Seq[Expression]): Expression
+  def getAggregations: Seq[Aggregations]
+}
+
+case class Sum(child: Expression) extends Aggregation {
+  override def toString = s"($child).sum"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child)
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.SUM)
+}
+
+case class Min(child: Expression) extends Aggregation {
+  override def toString = s"($child).min"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child)
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.MIN)
+
+}
+
+case class Max(child: Expression) extends Aggregation {
+  override def toString = s"($child).max"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child)
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.MAX)
+}
+
+case class Count(child: Expression) extends Aggregation {
+  override def typeInfo = {
+    child.typeInfo match {
+      case _ => // we can count anything... :D
+    }
+    BasicTypeInfo.INT_TYPE_INFO
+  }
+
+  override def toString = s"($child).count"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1)))
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.SUM)
+
+}
+
+case class Avg(child: Expression) extends Aggregation {
+  override def toString = s"($child).avg"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
+  // This is just sweet. Use our own AST representation and let the code generator do
+  // our dirty work.
+  override def getFinalField(inputs: Seq[Expression]): Expression =
+    Div(inputs(0), inputs(1))
+  override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
new file mode 100644
index 0000000..e3f62f8
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation}
+
+abstract class BinaryArithmetic extends BinaryExpression {
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+}
+
+case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+
+  override def toString = s"($left + $right)"
+}
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    child.typeInfo
+  }
+
+  override def toString = s"-($child)"
+}
+
+case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left - $right)"
+}
+
+case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left / $right)"
+}
+
+case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Abs(child: Expression) extends UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"abs($child)"
+}
+
+abstract class BitwiseBinaryArithmetic extends BinaryExpression {
+  def typeInfo: TypeInformation[_] = {
+    if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      left.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+}
+
+case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+  override def toString = s"($left & $right)"
+}
+
+case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+  override def toString = s"($left | $right)"
+}
+
+
+case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+  override def toString = s"($left ^ $right)"
+}
+
+case class BitwiseNot(child: Expression) extends UnaryExpression {
+  def typeInfo: TypeInformation[_] = {
+    if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      child.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+
+  override def toString = s"~($child)"
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
new file mode 100644
index 0000000..f83de5b
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
+  def typeInfo = tpe
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
new file mode 100644
index 0000000..fdb5fd0
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
+
+abstract class BinaryComparison extends BinaryExpression {
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left === $right"
+}
+
+case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left !== $right"
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left > $right"
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left >= $right"
+}
+
+case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left < $right"
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left <= $right"
+}
+
+case class IsNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNull"
+}
+
+case class IsNotNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNotNull"
+}


Mime
View raw message