flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/6] flink git commit: [FLINK-947] Add a declarative Expression API
Date Mon, 23 Feb 2015 15:05:27 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
new file mode 100644
index 0000000..1803bdd
--- /dev/null
+++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.expressions.tree.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *      l_orderkey, 
+ *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ *      o_orderdate, 
+ *      o_shippriority 
+ * FROM customer, 
+ *      orders, 
+ *      lineitem 
+ * WHERE
+ *      c_mktsegment = '[SEGMENT]' 
+ *      AND c_custkey = o_custkey
+ *      AND l_orderkey = o_orderkey
+ *      AND o_orderdate < date '[DATE]'
+ *      AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ *      l_orderkey, 
+ *      o_orderdate, 
+ *      o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - Expression Operations
+ *
+ */
+object TPCHQuery3Expression {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set filter date
+    val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
+    val date = dateFormat.parse("1995-03-12")
+    
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val lineitems = getLineitemDataSet(env)
+      .filter( l => dateFormat.parse(l.shipDate).after(date) )
+      .as('id, 'extdPrice, 'discount, 'shipDate)
+
+    val customers = getCustomerDataSet(env)
+      .as('id, 'mktSegment)
+      .filter( 'mktSegment === "AUTOMOBILE" )
+
+    val orders = getOrdersDataSet(env)
+      .filter( o => dateFormat.parse(o.orderDate).before(date) )
+      .as('orderId, 'custId, 'orderDate, 'shipPrio)
+
+    val items =
+      orders.join(customers)
+        .where('custId === 'id)
+        .select('orderId, 'orderDate, 'shipPrio)
+      .join(lineitems)
+        .where('orderId === 'id)
+        .select(
+          'orderId,
+          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
+          'orderDate,
+          'shipPrio)
+
+    val result = items
+      .groupBy('orderId, 'orderDate, 'shipPrio)
+      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
+
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+
+    // execute program
+    env.execute("Scala TPCH Query 3 (Expression) Example")
+  }
+  
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+  
+  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
+  case class Customer(id: Long, mktSegment: String)
+  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var lineitemPath: String = null
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 4) {
+      lineitemPath = args(0)
+      customerPath = args(1)
+      ordersPath = args(2)
+      outputPath = args(3)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          " Due to legal restrictions, we can not ship generated data.\n" +
+          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
+                             "<orders-csv path> <result path>");
+      false
+    }
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+    env.readCsvFile[Lineitem](
+        lineitemPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 5, 6, 10) )
+  }
+
+  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+    env.readCsvFile[Customer](
+        customerPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 6) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+    env.readCsvFile[Order](
+        ordersPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 4, 7) )
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
new file mode 100644
index 0000000..a0d2005
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAggregationTypes: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "231,1,21,21,11"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAggregationOnNonExistingField: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+      .select('foo.avg)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test
+  def testWorkingAggregationDataTypes: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression
+      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,1,1,1.5,1.5,2"
+  }
+
+  @Test
+  def testAggregationWithArithmetic: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression
+      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT")
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "5.5,2 THE COUNT"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingAggregationDataTypes: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("Hello", 1)).toExpression
+      .select('_1.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNoNestedAggregations: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("Hello", 1)).toExpression
+      .select('_2.sum.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
new file mode 100644
index 0000000..55969c4
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AsITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAs: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToFewFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToManyFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithAmbiguousFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithNonFieldReference1: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // as can only have field references
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithNonFieldReference2: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // as can only have field references
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
new file mode 100644
index 0000000..1515fb5
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class CastingITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAutoCastToString: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
+      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1b,1s,1i,1L,1.0f,1.0d"
+  }
+
+  @Test
+  def testNumericAutoCastInArithmetic: Unit = {
+
+    // don't test everything, just some common cast directions
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
+      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,2,2,2.0,2.0,2.0"
+  }
+
+  @Test
+  def testNumericAutoCastInComparison: Unit = {
+
+    // don't test everything, just some common cast directions
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
+      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,2,2,2,2.0,2.0"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
new file mode 100644
index 0000000..46074f3
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class ExpressionsITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testArithmetic: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, 10)).as('a, 'b)
+      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "0,10,2,10,1,-5"
+  }
+
+  @Test
+  def testLogic: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, true)).as('a, 'b)
+      .select('b && true, 'b && false, 'b || false, !'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "true,false,true,false"
+  }
+
+  @Test
+  def testComparisons: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
+      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "true,true,false,false,true"
+  }
+
+  @Test
+  def testBitwiseOperations: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,7,6,-4"
+  }
+
+  @Test
+  def testBitwiseWithAutocast: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,7,6,-4"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testBitwiseWithNonWorkingAutocast: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3.0, 5)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,7,6,-4"
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
new file mode 100644
index 0000000..04937f2
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.tree.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit._
+
+
+@RunWith(classOf[Parameterized])
+class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAllRejectingFilter: Unit = {
+    /*
+     * Test all-rejecting filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "\n"
+  }
+
+  @Test
+  def testAllPassingFilter: Unit = {
+    /*
+     * Test all-passing filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test
+  def testFilterOnStringTupleField: Unit = {
+    /*
+     * Test filter on String tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( _._3.contains("world") )
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField: Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+      "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+  }
+
+  // These two not yet done, but are planned
+
+  @Ignore
+  @Test
+  def testFilterBasicType: Unit = {
+    /*
+     * Test filter on basic type
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+
+    val filterDs = ds.filter( _.startsWith("H") )
+
+    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+  }
+
+  @Ignore
+  @Test
+  def testFilterOnCustomType: Unit = {
+    /*
+     * Test filter on custom type
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val filterDs = ds.filter( _.myString.contains("a") )
+    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
new file mode 100644
index 0000000..718ea6a
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class GroupedAggreagationsITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testGroupingOnNonExistentField: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('_foo)
+      .select('a.avg)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test
+  def testGroupedAggregate: Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('b)
+      .select('b, 'a.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+  }
+
+  @Test
+  def testGroupingKeyForwardIfNotUsed: Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('b)
+      .select('a.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
new file mode 100644
index 0000000..2cc37ff
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testJoin: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+  }
+
+  @Test
+  def testJoinWithFilter: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi,Hallo\n"
+  }
+
+  @Test
+  def testJoinWithMultipleKeys: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinNonExistingKey: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinWithNonMatchingKeyTypes: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test
+  def testJoinWithAggregation: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "6"
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
new file mode 100644
index 0000000..aefc8cf
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
@@ -0,0 +1,100 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.api.scala.expressions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.scala.PageRankExpression;
+import org.apache.flink.test.testdata.PageRankData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class PageRankExpressionITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 2;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+
+	private String verticesPath;
+	private String edgesPath;
+	private String resultPath;
+	private String expectedResult;
+
+	public PageRankExpressionITCase(Configuration config) {
+		super(config);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+		verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
+		edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = runProgram(curProgId);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+
+		return toParameterList(tConfigs);
+	}
+
+
+	public String runProgram(int progId) throws Exception {
+
+		switch(progId) {
+		case 1: {
+			PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData
+					.NUM_VERTICES + "", "3"});
+			return PageRankData.RANKS_AFTER_3_ITERATIONS;
+		}
+		case 2: {
+			// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
+			PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+			return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
+		}
+
+		default:
+			throw new IllegalArgumentException("Invalid program id");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
new file mode 100644
index 0000000..6acb016
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class SelectITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testSimpleSelectAll: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test
+  def testSimpleSelectWithNaming: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+      .select('_1 as 'a, '_2 as 'b)
+      .select('a, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToFewFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToManyFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithAmbiguousFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c as 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
new file mode 100644
index 0000000..b04d4dd
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class StringExpressionsITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testSubstring: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
+      .select('a.substring(0, 'b))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "AA\nB"
+  }
+
+  @Test
+  def testSubstringWithMaxEnd: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
+      .select('a.substring('b))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "CD\nBCD"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingSubstring1: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
+      .select('a.substring(0, 'b))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "AAA\nBB"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingSubstring2: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
+      .select('a.substring('b, 15))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "AAA\nBB"
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 1b98cbe..d6bf861 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -42,6 +42,7 @@ under the License.
 		<module>flink-hbase</module>
 		<module>flink-gelly</module>
 		<module>flink-hcatalog</module>
+		<module>flink-expressions</module>
 	</modules>
 	
 	<!-- See main pom.xml for explanation of profiles -->


Mime
View raw message