flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [35/50] flink git commit: [FLINK-1712] Remove "flink-staging" module
Date Thu, 14 Jan 2016 16:16:32 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
new file mode 100644
index 0000000..3b5459b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.operators.Operator
+import org.apache.flink.api.java.operators.SingleInputOperator
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+/**
+ * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some
+ * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this
+ * disappears since the translation methods simply returns the input.
+ */
+class RenameOperator[T](
+    input: JavaDataSet[T],
+    renamingTypeInformation: RenamingProxyTypeInfo[T])
+  extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) {
+
+  override protected def translateToDataFlow(
+      input: Operator[T]): Operator[T] = input
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
new file mode 100644
index 0000000..dd598ab
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
+FlatFieldDescriptor}
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer}
+
+/**
+ * A TypeInformation that is used to rename fields of an underlying CompositeType. This
+ * allows the system to translate "as" Table API operations to a [[RenameOperator]]
+ * that does not get translated to a runtime operator.
+ */
+class RenamingProxyTypeInfo[T](
+    val tpe: CompositeType[T],
+    val fieldNames: Array[String])
+  extends CompositeType[T](tpe.getTypeClass) {
+
+  def getUnderlyingType: CompositeType[T] = tpe
+
+  if (tpe.getArity != fieldNames.length) {
+    throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " +
+      s"number of fields in underlying type $tpe do not match.")
+  }
+
+  if (fieldNames.toSet.size != fieldNames.length) {
+    throw new IllegalArgumentException(s"New field names must be unique. " +
+      s"Names: ${fieldNames.mkString(",")}.")
+  }
+
+  override def getFieldIndex(fieldName: String): Int = {
+    val result = fieldNames.indexOf(fieldName)
+    if (result != fieldNames.lastIndexOf(fieldName)) {
+      -2
+    } else {
+      result
+    }
+  }
+  override def getFieldNames: Array[String] = fieldNames
+
+  override def isBasicType: Boolean = tpe.isBasicType
+
+  override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] =
+    tpe.createSerializer(executionConfig)
+
+  override def getArity: Int = tpe.getArity
+
+  override def isKeyType: Boolean = tpe.isKeyType
+
+  override def getTypeClass: Class[T] = tpe.getTypeClass
+
+  override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters
+
+  override def isTupleType: Boolean = tpe.isTupleType
+
+  override def toString = {
+    s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
+      s"fields: ${fieldNames.mkString(", ")})"
+  }
+
+  override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
+
+  override def getTotalFields: Int = tpe.getTotalFields
+
+  override def createComparator(
+        logicalKeyFields: Array[Int],
+        orders: Array[Boolean],
+        logicalFieldOffset: Int,
+        executionConfig: ExecutionConfig) =
+    tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig)
+
+  override def getFlatFields(
+      fieldExpression: String,
+      offset: Int,
+      result: util.List[FlatFieldDescriptor]): Unit = {
+    tpe.getFlatFields(fieldExpression, offset, result)
+  }
+
+  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
+    tpe.getTypeAt(fieldExpression)
+  }
+
+  override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = {
+    throw new RuntimeException("This method should never be called because createComparator is " +
+      "overwritten.")
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case renamingProxy: RenamingProxyTypeInfo[_] =>
+        renamingProxy.canEqual(this) &&
+        tpe.equals(renamingProxy.tpe) &&
+        fieldNames.sameElements(renamingProxy.fieldNames)
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = {
+    31 * tpe.hashCode() + util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]])
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[RenamingProxyTypeInfo[_]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
new file mode 100644
index 0000000..5e9613d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+
+/**
+ * Serializer for [[Row]].
+ */
+class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
+  extends TypeSerializer[Row] {
+
+  override def isImmutableType: Boolean = false
+
+  override def getLength: Int = -1
+
+  override def duplicate = this
+
+  override def createInstance: Row = {
+    new Row(fieldSerializers.length)
+  }
+
+  override def copy(from: Row, reuse: Row): Row = {
+    val len = fieldSerializers.length
+
+    if (from.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and from do not match.")
+    }
+    var i = 0
+    while (i < len) {
+      val reuseField = reuse.productElement(i)
+      val fromField = from.productElement(i).asInstanceOf[AnyRef]
+      val copy = fieldSerializers(i).copy(fromField, reuseField)
+      reuse.setField(i, copy)
+      i += 1
+    }
+    reuse
+  }
+
+  override def copy(from: Row): Row = {
+    val len = fieldSerializers.length
+
+    if (from.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and from do not match.")
+    }
+    val result = new Row(len)
+    var i = 0
+    while (i < len) {
+      val fromField = from.productElement(i).asInstanceOf[AnyRef]
+      val copy = fieldSerializers(i).copy(fromField)
+      result.setField(i, copy)
+      i += 1
+    }
+    result
+  }
+
+  override def serialize(value: Row, target: DataOutputView) {
+    val len = fieldSerializers.length
+    var i = 0
+    while (i < len) {
+      val serializer = fieldSerializers(i)
+      serializer.serialize(value.productElement(i), target)
+      i += 1
+    }
+  }
+
+  override def deserialize(reuse: Row, source: DataInputView): Row = {
+    val len = fieldSerializers.length
+
+    if (reuse.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and fields do not match.")
+    }
+
+    var i = 0
+    while (i < len) {
+      val field = reuse.productElement(i).asInstanceOf[AnyRef]
+      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
+      i += 1
+    }
+    reuse
+  }
+
+  override def deserialize(source: DataInputView): Row = {
+    val len = fieldSerializers.length
+
+    val result = new Row(len)
+    var i = 0
+    while (i < len) {
+      result.setField(i, fieldSerializers(i).deserialize(source))
+      i += 1
+    }
+    result
+  }
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    val len = fieldSerializers.length
+    var i = 0
+    while (i < len) {
+      fieldSerializers(i).copy(source, target)
+      i += 1
+    }
+  }
+
+  override def equals(any: scala.Any): Boolean = {
+    any match {
+      case otherRS: RowSerializer =>
+        otherRS.canEqual(this) &&
+          fieldSerializers.sameElements(otherRS.fieldSerializers)
+      case _ => false
+    }
+  }
+
+  override def canEqual(obj: scala.Any): Boolean = {
+    obj.isInstanceOf[RowSerializer]
+  }
+
+  override def hashCode(): Int = {
+    java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
new file mode 100644
index 0000000..db3c881
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
+
+/**
+ * TypeInformation for [[Row]].
+ */
+class RowTypeInfo(
+    fieldTypes: Seq[TypeInformation[_]],
+    fieldNames: Seq[String])
+  extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) {
+
+  def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name))
+
+  if (fieldNames.toSet.size != fieldNames.size) {
+    throw new IllegalArgumentException("Field names must be unique.")
+  }
+
+  override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = {
+    val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity)
+    for (i <- 0 until getArity) {
+      fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
+        .asInstanceOf[TypeSerializer[Any]]
+    }
+
+    new RowSerializer(fieldSerializers)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
new file mode 100644
index 0000000..dda6265
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.examples.java.graph.util.PageRankData
+import org.apache.flink.util.Collector
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+* A basic implementation of the Page Rank algorithm using a bulk iteration.
+*
+* This implementation requires a set of pages and a set of directed links as input and works as
+* follows.
+*
+* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each
+* page collects the partial ranks of all pages that point to it, sums them up, and applies a
+* dampening factor to the sum. The result is the new rank of the page. A new iteration is started
+* with the new ranks of all pages. This implementation terminates after a fixed number of
+* iterations. This is the Wikipedia entry for the
+* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
+*
+* Input files are plain text files and must be formatted as follows:
+*
+*  - Pages represented as an (long) ID separated by new-line characters.
+*    For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, and 63.
+*  - Links are represented as pairs of page IDs which are separated by space  characters. Links
+*    are separated by new-line characters.
+*    For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links (1)->(2), (2)->(12),
+*    (1)->(12), and (42)->(63). For this simple implementation it is required that each page has
+*    at least one incoming and one outgoing link (a page can point to itself).
+*
+* Usage:
+* {{{
+*   PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>
+* }}}
+*
+* If no parameters are provided, the program is run with default data from
+* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
+*
+* This example shows how to use:
+*
+*  - Bulk Iterations
+*  - Table API expressions
+*/
+object PageRankTable {
+
+  private final val DAMPENING_FACTOR: Double = 0.85
+  private final val EPSILON: Double = 0.0001
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
+      .as('pageId, 'rank)
+
+    val links = getLinksDataSet(env)
+
+    // build adjacency list from link input
+    val adjacencyLists = links
+      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+
+        override def reduce(
+            values: _root_.java.lang.Iterable[Link],
+            out: Collector[AdjacencyList]): Unit = {
+          var outputId = -1L
+          val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+          out.collect(new AdjacencyList(outputId, outputList.toArray))
+        }
+
+      }).as('sourceId, 'targetIds)
+
+    // start iteration
+    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+      currentRanks =>
+        val newRanks = currentRanks.toTable
+          // distribute ranks to target pages
+          .join(adjacencyLists).where('pageId === 'sourceId)
+          .select('rank, 'targetIds).toDataSet[RankOutput]
+          .flatMap {
+            (in, out: Collector[(Long, Double)]) =>
+              val targets = in.targetIds
+              val len = targets.length
+              targets foreach { t => out.collect((t, in.rank / len )) }
+          }
+          .as('pageId, 'rank)
+          // collect ranks and sum them up
+          .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
+          // apply dampening factor
+          .select(
+            'pageId,
+            ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank)
+
+
+        val termination = currentRanks.toTable
+          .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
+          .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
+
+        (newRanks, termination)
+    }
+
+    val result = finalRanks
+
+    // emit result
+    if (fileOutput) {
+      result.writeAsCsv(outputPath, "\n", " ")
+      // execute program
+      env.execute("Expression PageRank Example")
+    } else {
+      // execute program and print result
+      result.print()
+    }
+  }
+
+  // *************************************************************************
+  //     USER TYPES
+  // *************************************************************************
+
+  case class Link(sourceId: Long, targetId: Long)
+
+  case class Page(pageId: Long, rank: Double)
+
+  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
+  case class RankOutput(rank: Double, targetIds: Array[Long])
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 5) {
+        pagesInputPath = args(0)
+        linksInputPath = args(1)
+        outputPath = args(2)
+        numPages = args(3).toLong
+        maxIterations = args(4).toInt
+      } else {
+        System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+          "pages> <num iterations>")
+        false
+      }
+    } else {
+      System.out.println("Executing PageRank Basic example with default parameters and built-in " +
+        "default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+        "pages> <num iterations>")
+
+      numPages = PageRankData.getNumberOfPages
+    }
+    true
+  }
+
+  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+    if (fileOutput) {
+      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n")
+        .map(x => x._1)
+    } else {
+      env.generateSequence(1, 15)
+    }
+  }
+
+  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+    if (fileOutput) {
+      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+    } else {
+      val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
+        v2.asInstanceOf[Long])}
+      env.fromCollection(edges)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var pagesInputPath: String = null
+  private var linksInputPath: String = null
+  private var outputPath: String = null
+  private var numPages: Double = 0
+  private var maxIterations: Int = 10
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
new file mode 100644
index 0000000..63dddc9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.streaming.api.scala._
+
+import org.apache.flink.api.scala.table._
+
+import scala.Stream._
+import scala.math._
+import scala.language.postfixOps
+import scala.util.Random
+
+/**
+ * Simple example for demonstrating the use of the Table API with Flink Streaming.
+ */
+object StreamingTableFilter {
+
+  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val cars = genCarStream().toTable
+      .filter('carId === 0)
+      .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
+      .toDataStream[CarEvent]
+
+    cars.print()
+
+    StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
+
+  }
+
+  def genCarStream(): DataStream[CarEvent] = {
+
+    def nextSpeed(carEvent : CarEvent) : CarEvent =
+    {
+      val next =
+        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
+      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
+    }
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+    {
+      Thread.sleep(1000)
+      speeds.append(carStream(speeds.map(nextSpeed)))
+    }
+    carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
+  }
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 3) {
+        numOfCars = args(0).toInt
+        evictionSec = args(1).toInt
+        triggerMeters = args(2).toDouble
+        true
+      }
+      else {
+        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
+        false
+      }
+    }else{
+      true
+    }
+  }
+
+  var numOfCars = 2
+  var evictionSec = 10
+  var triggerMeters = 50d
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
new file mode 100644
index 0000000..f527a3c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *      l_orderkey, 
+ *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ *      o_orderdate, 
+ *      o_shippriority 
+ * FROM customer, 
+ *      orders, 
+ *      lineitem 
+ * WHERE
+ *      c_mktsegment = '[SEGMENT]' 
+ *      AND c_custkey = o_custkey
+ *      AND l_orderkey = o_orderkey
+ *      AND o_orderdate < date '[DATE]'
+ *      AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ *      l_orderkey, 
+ *      o_orderdate, 
+ *      o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - Table API expressions
+ *
+ */
+object TPCHQuery3Table {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set filter date
+    val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
+    val date = dateFormat.parse("1995-03-12")
+    
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val lineitems = getLineitemDataSet(env)
+      .filter( l => dateFormat.parse(l.shipDate).after(date) )
+      .as('id, 'extdPrice, 'discount, 'shipDate)
+
+    val customers = getCustomerDataSet(env)
+      .as('id, 'mktSegment)
+      .filter( 'mktSegment === "AUTOMOBILE" )
+
+    val orders = getOrdersDataSet(env)
+      .filter( o => dateFormat.parse(o.orderDate).before(date) )
+      .as('orderId, 'custId, 'orderDate, 'shipPrio)
+
+    val items =
+      orders.join(customers)
+        .where('custId === 'id)
+        .select('orderId, 'orderDate, 'shipPrio)
+      .join(lineitems)
+        .where('orderId === 'id)
+        .select(
+          'orderId,
+          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
+          'orderDate,
+          'shipPrio)
+
+    val result = items
+      .groupBy('orderId, 'orderDate, 'shipPrio)
+      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
+
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+
+    // execute program
+    env.execute("Scala TPCH Query 3 (Expression) Example")
+  }
+  
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+  
+  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
+  case class Customer(id: Long, mktSegment: String)
+  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var lineitemPath: String = null
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 4) {
+      lineitemPath = args(0)
+      customerPath = args(1)
+      ordersPath = args(2)
+      outputPath = args(3)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          " Due to legal restrictions, we can not ship generated data.\n" +
+          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
+                             "<orders-csv path> <result path>");
+      false
+    }
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+    env.readCsvFile[Lineitem](
+        lineitemPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 5, 6, 10) )
+  }
+
+  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+    env.readCsvFile[Customer](
+        customerPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 6) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+    env.readCsvFile[Order](
+        ordersPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 4, 7) )
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
new file mode 100644
index 0000000..cac9590
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * Simple example for demonstrating the use of the Table API for a Word Count.
+ */
+object WordCountTable {
+
+  case class WC(word: String, count: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    val expr = input.toTable
+    val result = expr
+      .groupBy('word)
+      .select('word, 'count.sum as 'count)
+      .toDataSet[WC]
+
+    result.print()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java
new file mode 100644
index 0000000..9152260
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import java.util.List;
+
+public class Node {
+	private int id;
+	private String type;
+	private String pact;
+	private String contents;
+	private int parallelism;
+	private String driver_strategy;
+	private List<Predecessors> predecessors;
+	private List<Global_properties> global_properties;
+	private List<LocalProperty> local_properties;
+	private List<Estimates> estimates;
+	private List<Costs> costs;
+	private List<Compiler_hints> compiler_hints;
+
+	public int getId() {
+		return id;
+	}
+	public String getType() {
+		return type;
+	}
+	public String getPact() {
+		return pact;
+	}
+	public String getContents() {
+		return contents;
+	}
+	public int getParallelism() {
+		return parallelism;
+	}
+	public String getDriver_strategy() {
+		return driver_strategy;
+	}
+	public List<Predecessors> getPredecessors() {
+		return predecessors;
+	}
+	public List<Global_properties> getGlobal_properties() {
+		return global_properties;
+	}
+	public List<LocalProperty> getLocal_properties() {
+		return local_properties;
+	}
+	public List<Estimates> getEstimates() {
+		return estimates;
+	}
+	public List<Costs> getCosts() {
+		return costs;
+	}
+	public List<Compiler_hints> getCompiler_hints() {
+		return compiler_hints;
+	}
+}
+
+class Predecessors {
+	private String ship_strategy;
+	private String exchange_mode;
+
+	public String getShip_strategy() {
+		return ship_strategy;
+	}
+	public String getExchange_mode() {
+		return exchange_mode;
+	}
+}
+
+class Global_properties {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class LocalProperty {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class Estimates {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class Costs {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class Compiler_hints {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java
new file mode 100644
index 0000000..31a7cd68
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class PlanJsonParser {
+
+	public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception {
+		ObjectMapper objectMapper = new ObjectMapper();
+
+		//not every node is same, ignore the unknown field
+		objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+		PlanTree tree = objectMapper.readValue(t, PlanTree.class);
+		LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		int tabCount = 0;
+
+		for (int index = 0; index < tree.getNodes().size(); index++) {
+			Node tempNode = tree.getNodes().get(index);
+
+			//input with operation such as join or union is coordinate, keep the same indent 
+			if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) {
+				tabCount = map.get(tempNode.getPact());
+			}
+			else {
+				map.put(tempNode.getPact(), tabCount);
+			}
+
+			printTab(tabCount, pw);
+			pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n");
+
+			printTab(tabCount + 1, pw);
+			String content = tempNode.getContents();
+
+			//drop the hashcode of object instance
+			int dele = tempNode.getContents().indexOf("@");
+			if (dele > -1) content = tempNode.getContents().substring(0, dele);
+			
+			//replace with certain content if node is dataSource to pass
+			//unit tests, because java and scala use different api to
+			//get input element
+			if (tempNode.getPact().equals("Data Source"))
+				content = "collect elements with CollectionInputFormat";
+			pw.print("content : " + content + "\n");
+
+			List<Predecessors> predecessors = tempNode.getPredecessors();
+			if (predecessors != null) {
+				printTab(tabCount + 1, pw);
+				pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
+
+				printTab(tabCount + 1, pw);
+				pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n");
+			}
+
+			if (tempNode.getDriver_strategy() != null) {
+				printTab(tabCount + 1, pw);
+				pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
+			}
+
+			printTab(tabCount + 1, pw);
+			pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+					+ tempNode.getGlobal_properties().get(0).getValue() + "\n");
+
+			if (extended) {
+				List<Global_properties> globalProperties = tempNode.getGlobal_properties();
+				for (int i = 1; i < globalProperties.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(globalProperties.get(i).getName() + " : "
+					+ globalProperties.get(i).getValue() + "\n");
+				}
+
+				List<LocalProperty> localProperties = tempNode.getLocal_properties();
+				for (int i = 0; i < localProperties.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(localProperties.get(i).getName() + " : "
+					+ localProperties.get(i).getValue() + "\n");
+				}
+
+				List<Estimates> estimates = tempNode.getEstimates();
+				for (int i = 0; i < estimates.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(estimates.get(i).getName() + " : "
+					+ estimates.get(i).getValue() + "\n");
+				}
+
+				List<Costs> costs = tempNode.getCosts();
+				for (int i = 0; i < costs.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(costs.get(i).getName() + " : "
+					+ costs.get(i).getValue() + "\n");
+				}
+
+				List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints();
+				for (int i = 0; i < compilerHintses.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(compilerHintses.get(i).getName() + " : "
+					+ compilerHintses.get(i).getValue() + "\n");
+				}
+			}
+			tabCount++;
+			pw.print("\n");
+		}
+		pw.close();
+		return sw.toString();
+	}
+
+	private static void printTab(int tabCount, PrintWriter pw) {
+		for (int i = 0; i < tabCount; i++)
+			pw.print("\t");
+	}
+}
+
+class PlanTree {
+	private List<Node> nodes;
+
+	public List<Node> getNodes() {
+		return nodes;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
new file mode 100644
index 0000000..bdebfb1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.table.test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class AggregationsITCase extends MultipleProgramsTestBase {
+
+
+	public AggregationsITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testAggregationTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+
+		Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "231,1,21,21,11";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAggregationOnNonExistingField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+
+		Table result =
+				table.select("foo.avg");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testWorkingAggregationDataTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
+				env.fromElements(
+						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
+						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result =
+				table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,1,1,1.5,1.5,2";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAggregationWithArithmetic() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Float, String>> input =
+				env.fromElements(
+						new Tuple2<>(1f, "Hello"),
+						new Tuple2<>(2f, "Ciao"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result =
+				table.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\"");
+
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "5.5,2 THE COUNT";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAggregationWithTwoCount() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Float, String>> input =
+			env.fromElements(
+				new Tuple2<>(1f, "Hello"),
+				new Tuple2<>(2f, "Ciao"));
+
+		Table table =
+			tableEnv.fromDataSet(input);
+
+		Table result =
+			table.select("f0.count, f1.count");
+
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testNonWorkingDataTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result =
+				table.select("f1.sum");
+
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testNoNestedAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result =
+				table.select("f0.sum.sum");
+
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
new file mode 100644
index 0000000..f6ab54e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class AsITCase extends MultipleProgramsTestBase {
+
+
+	public AsITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testAs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithToFewFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithToManyFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithAmbiguousFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithNonFieldReference1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithNonFieldReference2() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," +
+						" c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
new file mode 100644
index 0000000..7e9e3dc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CastingITCase extends MultipleProgramsTestBase {
+
+
+	public CastingITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testAutoCastToString() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
+				env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result = table.select(
+				"f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\"");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1b,1s,1i,1L,1.0f,1.0d";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testNumericAutocastInArithmetic() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
+				env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result = table.select("f0 + 1, f1 +" +
+				" 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,2,2.0,2.0,2.0";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testNumericAutocastInComparison() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
+				env.fromElements(
+						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
+						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello"));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a,b,c,d,e,f,g");
+
+		Table result = table
+				.filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,2,2,2.0,2.0,Hello";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testCastFromString() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple3<String, String, String>> input =
+				env.fromElements(new Tuple3<>("1", "true", "2.0"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result = table.select(
+				"f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,1,1,2.0,2.0,true\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testCastDateFromString() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple4<String, String, String, String>> input =
+				env.fromElements(new Tuple4<>("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result = table
+				.select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
+				.select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
+				"1970-01-17 17:47:53.775\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testCastDateToStringAndLong() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<String, String>> input =
+			env.fromElements(new Tuple2<>("2011-05-03 15:51:36.000", "1304437896000"));
+
+		Table table =
+			tableEnv.fromDataSet(input);
+
+		Table result = table
+			.select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1")
+			.select("f0.cast(STRING), f0.cast(LONG), f1.cast(STRING), f1.cast(LONG)");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
+		compareResultAsText(results, expected);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
new file mode 100644
index 0000000..c9bba62
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ExpressionsITCase extends MultipleProgramsTestBase {
+
+
+	public ExpressionsITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testArithmetic() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Integer, Integer>> input =
+				env.fromElements(new Tuple2<>(5, 10));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b");
+
+		Table result = table.select(
+				"a - 5, a + 5, a / 2, a * 2, a % 2, -a");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "0,10,2,10,1,-5";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testLogic() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Integer, Boolean>> input =
+				env.fromElements(new Tuple2<>(5, true));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b");
+
+		Table result = table.select(
+				"b && true, b && false, b || false, !b");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "true,false,true,false";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testComparisons() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple3<Integer, Integer, Integer>> input =
+				env.fromElements(new Tuple3<>(5, 5, 4));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table.select(
+				"a > c, a >= b, a < c, a.isNull, a.isNotNull");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "true,true,false,false,true";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testBitwiseOperation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Byte, Byte>> input =
+				env.fromElements(new Tuple2<>((byte) 3, (byte) 5));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b");
+
+		Table result = table.select(
+				"a & b, a | b, a ^ b, ~a");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,7,6,-4";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testBitwiseWithAutocast() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Integer, Byte>> input =
+				env.fromElements(new Tuple2<>(3, (byte) 5));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b");
+
+		Table result = table.select(
+				"a & b, a | b, a ^ b, ~a");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,7,6,-4";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testBitwiseWithNonWorkingAutocast() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSource<Tuple2<Float, Byte>> input =
+				env.fromElements(new Tuple2<>(3.0f, (byte) 5));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b");
+
+		Table result =
+				table.select("a & b, a | b, a ^ b, ~a");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
new file mode 100644
index 0000000..44e0def
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class FilterITCase extends MultipleProgramsTestBase {
+
+
+	public FilterITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testAllRejectingFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("false");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAllPassingFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("true");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testFilterOnIntegerTupleField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter(" a % 2 = 0 ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testNotEquals() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("!( a % 2 <> 0 ) ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testIntegerBiggerThan128() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
+
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table.filter("a = 300 ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "300,1,Hello\n";
+		compareResultAsText(results, expected);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
new file mode 100644
index 0000000..f5c9185
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
+
+
+	public GroupedAggregationsITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testGroupingOnNonExistentField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.groupBy("foo").select("a.avg");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testGroupedAggregate() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.groupBy("b").select("b, a.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testGroupingKeyForwardIfNotUsed() throws Exception {
+
+		// the grouping key needs to be forwarded to the intermediate DataSet, even
+		// if we don't want the key in the output
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+				tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.groupBy("b").select("a.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testGroupNoAggregation() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table =
+			tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+
+		String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
+		List<Row> results = ds.collect();
+		compareResultAsText(results, expected);
+	}
+}
+


Mime
View raw message