flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/4] flink git commit: [FLINK-1159] [Scala API] Add API extension to support case-style anonymous functions
Date Mon, 04 Apr 2016 19:32:23 GMT
[FLINK-1159] [Scala API] Add API extension to support case-style anonymous functions

This closes #1704


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cb84f18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cb84f18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cb84f18

Branch: refs/heads/master
Commit: 5cb84f185963fa89be5d0c4e83bad66bac44d84d
Parents: 3465580
Author: Stefano Baghino <stefano@baghino.me>
Authored: Wed Feb 24 13:05:16 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Apr 4 21:31:49 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/index.md                        |   9 +
 docs/apis/java8.md                              |   2 +-
 docs/apis/scala_api_extensions.md               | 409 +++++++++++++++++++
 docs/apis/streaming/index.md                    |  10 +
 .../OnCoGroupDataSet.scala                      |  51 +++
 .../acceptPartialFunctions/OnCrossDataSet.scala |  48 +++
 .../impl/acceptPartialFunctions/OnDataSet.scala | 118 ++++++
 .../OnGroupedDataSet.scala                      |  87 ++++
 .../OnHalfUnfinishedKeyPairOperation.scala      |  46 +++
 .../OnJoinFunctionAssigner.scala                |  48 +++
 .../OnUnfinishedKeyPairOperation.scala          |  48 +++
 .../flink/api/scala/extensions/package.scala    |  95 +++++
 .../extensions/base/AcceptPFTestBase.scala      |  38 ++
 .../scala/extensions/data/KeyValuePair.scala    |  26 ++
 .../OnCoGroupDataSetTest.scala                  |  57 +++
 .../OnCrossDataSetTest.scala                    |  49 +++
 .../acceptPartialFunctions/OnDataSetTest.scala  | 171 ++++++++
 .../OnGroupedDataSetTest.scala                  | 120 ++++++
 .../OnHalfUnfinishedKeyPairOperationTest.scala  | 150 +++++++
 .../OnJoinFunctionAssignerTest.scala            | 141 +++++++
 .../OnUnfinishedKeyPairOperationTest.scala      | 137 +++++++
 .../streaming/api/scala/ConnectedStreams.scala  |   5 +-
 .../streaming/api/scala/JoinedStreams.scala     |   2 +-
 .../OnConnectedStream.scala                     |  79 ++++
 .../acceptPartialFunctions/OnDataStream.scala   |  78 ++++
 .../acceptPartialFunctions/OnJoinedStream.scala |  50 +++
 .../acceptPartialFunctions/OnKeyedStream.scala  |  55 +++
 .../OnWindowedStream.scala                      |  90 ++++
 .../api/scala/extensions/package.scala          |  89 ++++
 .../extensions/base/AcceptPFTestBase.scala      |  54 +++
 .../scala/extensions/data/KeyValuePair.scala    |  26 ++
 .../OnConnectedDataStreamTest.scala             | 102 +++++
 .../OnDataStreamTest.scala                      | 109 +++++
 .../OnJoinedDataStreamTest.scala                |  67 +++
 .../OnKeyedDataStreamTest.scala                 |  69 ++++
 .../OnWindowedDataStreamTest.scala              |  97 +++++
 36 files changed, 2827 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/index.md b/docs/apis/batch/index.md
index 2a954d9..ff53219 100644
--- a/docs/apis/batch/index.md
+++ b/docs/apis/batch/index.md
@@ -729,6 +729,15 @@ val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
   </tbody>
 </table>
 
+Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:
+{% highlight scala %}
+val data: DataSet[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+}
+{% endhighlight %}
+is not supported by the API out-of-the-box. To use this feature, you should use a <a href="../scala_api_extensions.html">Scala API extension</a>.
+
 </div>
 </div>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/java8.md
----------------------------------------------------------------------
diff --git a/docs/apis/java8.md b/docs/apis/java8.md
index 53269e3..821038b 100644
--- a/docs/apis/java8.md
+++ b/docs/apis/java8.md
@@ -2,7 +2,7 @@
 title: "Java 8 Programming Guide"
 # Top-level navigation
 top-nav-group: apis
-top-nav-pos: 11
+top-nav-pos: 12
 top-nav-title: Java 8
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_api_extensions.md b/docs/apis/scala_api_extensions.md
new file mode 100644
index 0000000..e3268bf
--- /dev/null
+++ b/docs/apis/scala_api_extensions.md
@@ -0,0 +1,409 @@
+---
+title: "Scala API Extensions"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 11
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+In order to keep a fair amount of consistency between the Scala and Java APIs, some 
+of the features that allow a high-level of expressiveness in Scala have been left
+out from the standard APIs for both batch and streaming.
+
+If you want to _enjoy the full Scala experience_ you can choose to opt-in to 
+extensions that enhance the Scala API via implicit conversions.
+
+To use all the available extensions, you can just add a simple `import` for the
+DataSet API
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions._
+{% endhighlight %}
+
+or the DataStream API
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions._
+{% endhighlight %}
+
+Alternatively, you can import individual extensions _a-là-carte_ to only use those
+you prefer.
+
+## Accept partial functions
+
+Normally, both the DataSet and DataStream APIs don't accept anonymous pattern
+matching functions to deconstruct tuples, case classes or collections, like the
+following:
+
+{% highlight scala %}
+val data: DataSet[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+  // The previous line causes the following compilation error:
+  // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
+}
+{% endhighlight %}
+
+This extension introduces new methods in both the DataSet and DataStream Scala API
+that have a one-to-one correspondance in the extended API. These delegating methods 
+do support anonymous pattern matching functions.
+
+#### DataSet API
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Original</th>
+      <th class="text-center">Example</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith {
+  case (_, value) => value.toString
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapPartitionWith</strong></td>
+      <td><strong>mapPartition (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapPartitionWith {
+  case head #:: _ => head
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith {
+  case (_, name, visitTimes) => visitTimes.map(name -> _)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>filterWith</strong></td>
+      <td><strong>filter (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.filterWith {
+  case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceWith</strong></td>
+      <td><strong>reduce (DataSet, GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceWith {
+  case ((_, amount1), (_, amount2)) => amount1 + amount2
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceGroupWith</strong></td>
+      <td><strong>reduceGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceGroupWith {
+  case id #:: value #:: _ => id -> value
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>groupingBy</strong></td>
+      <td><strong>groupBy (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.groupingBy {
+  case (id, _, _) => id
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>sortGroupWith</strong></td>
+      <td><strong>sortGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+grouped.sortGroupWith(Order.ASCENDING) {
+  case House(_, value) => value
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>combineGroupWith</strong></td>
+      <td><strong>combineGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+grouped.combineGroupWith {
+  case header #:: amounts => amounts.sum
+}
+{% endhighlight %}
+      </td>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data1.join(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case ((pk, tx), (products, fk)) => tx -> products
+  }
+
+data1.cross(data2).projecting {
+  case ((a, _), (_, b) => a -> b
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (CoGroupDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data1.coGroup(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case (head1 #:: _, head2 #:: _) => head1 -> head2
+  }
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    </tr>
+  </tbody>
+</table>
+
+#### DataStream API
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Original</th>
+      <th class="text-center">Example</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith {
+  case (_, value) => value.toString
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapPartitionWith</strong></td>
+      <td><strong>mapPartition (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapPartitionWith {
+  case head #:: _ => head
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith {
+  case (_, name, visits) => visits.map(name -> _)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>filterWith</strong></td>
+      <td><strong>filter (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.filterWith {
+  case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>keyingBy</strong></td>
+      <td><strong>keyBy (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.keyingBy {
+  case (id, _, _) => id
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith(
+  map1 = case (_, value) => value.toString,
+  map2 = case (_, _, value, _) => value + 1
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith(
+  flatMap1 = case (_, json) => parse(json),
+  flatMap2 = case (_, _, json, _) => parse(json)
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>keyingBy</strong></td>
+      <td><strong>keyBy (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.keyingBy(
+  key1 = case (_, timestamp) => timestamp,
+  key2 = case (id, _, _) => id
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceWith</strong></td>
+      <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceWith {
+  case ((_, sum1), (_, sum2) => sum1 + sum2
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>foldWith</strong></td>
+      <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.foldWith(User(bought = 0)) {
+  case (User(b), (_, items)) => User(b + items.size)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>applyWith</strong></td>
+      <td><strong>apply (WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.applyWith(0)(
+  foldFunction = case (sum, amount) => sum + amount
+  windowFunction = case (k, w, sum) => // [...]
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (JoinedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data1.join(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case ((pk, tx), (products, fk)) => tx -> products
+  }
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+
+For more information on the semantics of each method, please refer to the 
+[DataStream](batch/index.html) and [DataSet](streaming/index.html) API documentation.
+
+To use this extension exclusively, you can add the following `import`:
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+for the DataSet extensions and
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+The following snippet shows a minimal example of how to use these extension
+methods together (with the DataSet API):
+
+{% highlight scala %}
+object Main {
+  import org.apache.flink.api.scala.extensions._
+  case class Point(x: Double, y: Double)
+  def main(args: Array[String]): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+    ds.filterWith {
+      case Point(x, _) => x > 1
+    }.reduceWith {
+      case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
+    }.mapWith {
+      case Point(x, y) => (x, y)
+    }.flatMapWith {
+      case (x, y) => Seq("x" -> x, "y" -> y)
+    }.groupingBy {
+      case (id, value) => id
+    }
+  }
+}
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index 6d69459..a899844 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -873,6 +873,16 @@ stream.assignTimestamps { timestampExtractor }
   </tbody>
 </table>
 
+Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:
+{% highlight scala %}
+val data: DataStream[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+}
+{% endhighlight %}
+is not supported by the API out-of-the-box. To use this feature, you should use a <a href="../scala_api_extensions.html">Scala API extension</a>.
+
+
 </div>
 </div>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala
new file mode 100644
index 0000000..0337d44
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{CoGroupDataSet, DataSet}
+
+import scala.reflect.ClassTag
+
+/**
+  * Wraps a co-group data set, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param ds The wrapped co-group data set
+  * @tparam L The type of the left data set items
+  * @tparam R The type of the right data set items
+  */
+class OnCoGroupDataSet[L, R](ds: CoGroupDataSet[L, R]) {
+
+  /**
+    * Co-groups the data sets using the function `fun` to project elements from both in
+    * the resulting data set
+    *
+    * @param fun The function that defines the projection of the co-group operation
+    * @tparam O The return type of the projection, for which type information must be known
+    * @return A fully co-grouped data set of Os
+    */
+  @PublicEvolving
+  def projecting[O: TypeInformation: ClassTag](fun: (Stream[L], Stream[R]) => O): DataSet[O] =
+    ds {
+      (left, right) =>
+        fun(left.toStream, right.toStream)
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala
new file mode 100644
index 0000000..a0d4ea1
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{CrossDataSet, DataSet}
+
+import scala.reflect.ClassTag
+
+/**
+  * Wraps a cross data set, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param ds The wrapped cross data set
+  * @tparam L The type of the left data set items
+  * @tparam R The type of the right data set items
+  */
+class OnCrossDataSet[L, R](ds: CrossDataSet[L, R]) {
+
+  /**
+    * Starting from a cross data set, uses the function `fun` to project elements from
+    * both the input data sets in the resulting data set
+    *
+    * @param fun The function that defines the projection of the join
+    * @tparam O The return type of the projection, for which type information must be known
+    * @return A data set of Os
+    */
+  @PublicEvolving
+  def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] =
+    ds(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
new file mode 100644
index 0000000..b2521b0
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+
+import scala.reflect.ClassTag
+
+/**
+  * Wraps a data set, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param ds The wrapped data set
+  * @tparam T The type of the data set items
+  */
+class OnDataSet[T](ds: DataSet[T]) {
+
+  /**
+    * Applies a function `fun` to each item of the data set
+    *
+    * @param fun The function to be applied to each item
+    * @tparam R The type of the items in the returned data set
+    * @return A dataset of R
+    */
+  @PublicEvolving
+  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
+    ds.map(fun)
+
+  /**
+    * Applies a function `fun` to a partition as a whole
+    *
+    * @param fun The function to be applied on the whole partition
+    * @tparam R The type of the items in the returned data set
+    * @return A dataset of R
+    */
+  @PublicEvolving
+  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+    ds.mapPartition {
+      (it, out) =>
+        out.collect(fun(it.toStream))
+    }
+
+  /**
+    * Applies a function `fun` to each item of the dataset, producing a collection of items
+    * that will be flattened in the resulting data set
+    *
+    * @param fun The function to be applied to each item
+    * @tparam R The type of the items in the returned data set
+    * @return A dataset of R
+    */
+  @PublicEvolving
+  def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R] =
+    ds.flatMap(fun)
+
+  /**
+    * Applies a predicate `fun` to each item of the data set, keeping only those for which
+    * the predicate holds
+    *
+    * @param fun The predicate to be tested on each item
+    * @return A dataset of R
+    */
+  @PublicEvolving
+  def filterWith(fun: T => Boolean): DataSet[T] =
+    ds.filter(fun)
+
+  /**
+    * Applies a reducer `fun` to the data set
+    *
+    * @param fun The reducing function to be applied on the whole data set
+    * @return A data set of Rs
+    */
+  @PublicEvolving
+  def reduceWith(fun: (T, T) => T): DataSet[T] =
+    ds.reduce(fun)
+
+  /**
+    * Applies a reducer `fun` to a grouped data set
+    *
+    * @param fun The function to be applied to the whole grouping
+    * @tparam R The type of the items in the returned data set
+    * @return A dataset of Rs
+    */
+  @PublicEvolving
+  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+    ds.reduceGroup {
+      (it, out) =>
+        out.collect(fun(it.toStream))
+    }
+
+  /**
+    * Groups the items according to a grouping function `fun`
+    *
+    * @param fun The grouping function
+    * @tparam K The return type of the grouping function, for which type information must be known
+    * @return A grouped data set of Ts
+    */
+  @PublicEvolving
+  def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T] =
+    ds.groupBy(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
new file mode 100644
index 0000000..07abccb
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+
+import scala.reflect.ClassTag
+
+/**
+  * Wraps a grouped data set, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param ds The wrapped grouped data set
+  * @tparam T The type of the grouped data set items
+  */
+class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
+
+  /**
+    * Sorts a group using a sorting function `fun` and an `Order`
+    *
+    * @param fun The sorting function, defining the sorting key
+    * @param order The ordering strategy (ascending, descending, etc.)
+    * @tparam K The key type
+    * @return A data set sorted group-wise
+    */
+  @PublicEvolving
+  def sortGroupWith[K: TypeInformation](order: Order)(fun: T => K): GroupedDataSet[T] =
+    ds.sortGroup(fun, order)
+
+  /**
+    * Reduces the whole data set with a reducer `fun`
+    *
+    * @param fun The reducing function
+    * @return A reduced data set of Ts
+    */
+  @PublicEvolving
+  def reduceWith(fun: (T, T) => T): DataSet[T] =
+    ds.reduce(fun)
+
+  /**
+    * Reduces the data set group-wise with a reducer `fun`
+    *
+    * @param fun The reducing function
+    * @tparam R The type of the items in the resulting data set
+    * @return A data set of Rs reduced group-wise
+    */
+  @PublicEvolving
+  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+    ds.reduceGroup {
+      (it, out) =>
+        out.collect(fun(it.toStream))
+    }
+
+  /**
+    * Same as a reducing operation but only acts locally,
+    * ideal to perform pre-aggregation before a reduction.
+    *
+    * @param fun The reducing function
+    * @tparam R The type of the items in the resulting data set
+    * @return A data set of Rs reduced group-wise
+    */
+  @PublicEvolving
+  def combineGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+    ds.combineGroup {
+      (it, out) =>
+        out.collect(fun(it.toStream))
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala
new file mode 100644
index 0000000..a77c405
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.HalfUnfinishedKeyPairOperation
+/**
+  * Wraps an half unfinished key pair operation, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param ds The wrapped half unfinished key pair operation function assigner data set
+  * @tparam L The type of the left data set items
+  * @tparam R The type of the right data set items
+  * @tparam O The type of the output data set items
+  */
+class OnHalfUnfinishedKeyPairOperation[L, R, O](ds: HalfUnfinishedKeyPairOperation[L, R, O]) {
+
+  /**
+    * Initiates a join or co-group operation, defining the second half of
+    * the where clause with an equality over the right data set items.
+    *
+    * @param fun The function that defines the equality of the where clause
+    * @tparam K The type of the key, for which type information must be known
+    * @return A data set of Os
+    */
+  @PublicEvolving
+  def isEqualTo[K: TypeInformation](fun: R => K): O =
+    ds.equalTo(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala
new file mode 100644
index 0000000..4ab41e5
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{DataSet, JoinFunctionAssigner}
+
+import scala.reflect.ClassTag
+
+/**
+  * Wraps a join function assigner, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param ds The wrapped join function assigner data set
+  * @tparam L The type of the left data set items
+  * @tparam R The type of the right data set items
+  */
+class OnJoinFunctionAssigner[L, R](ds: JoinFunctionAssigner[L, R]) {
+
+  /**
+    * Joins the data sets using the function `fun` to project elements from both in the
+    * resulting data set
+    *
+    * @param fun The function that defines the projection of the join
+    * @tparam O The return type of the projection, for which type information must be known
+    * @return A fully joined data set of Os
+    */
+  @PublicEvolving
+  def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] =
+    ds(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala
new file mode 100644
index 0000000..4fa6fcc
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{HalfUnfinishedKeyPairOperation, UnfinishedKeyPairOperation}
+
+/**
+  * Wraps an unfinished key pair operation, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param ds The wrapped unfinished key pair operation data set
+  * @tparam L The type of the left data set items
+  * @tparam R The type of the right data set items
+  * @tparam O The type of the output data set items
+  */
+class OnUnfinishedKeyPairOperation[L, R, O](ds: UnfinishedKeyPairOperation[L, R, O]) {
+
+  /**
+    * Initiates a join or co-group operation, defining the first half of
+    * the where clause with the items of the left data set that will be
+    * checked for equality with the ones provided by the second half.
+    *
+    * @param fun The function that defines the comparing item of the where clause
+    * @tparam K The type of the key, for which type information must be known
+    * @return A data set of Os
+    */
+  @PublicEvolving
+  def whereClause[K: TypeInformation](fun: (L) => K): HalfUnfinishedKeyPairOperation[L, R, O] =
+    ds.where(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
new file mode 100644
index 0000000..7e5ab8a
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions._
+
+/**
+  * acceptPartialFunctions extends the original DataSet with methods with unique names
+  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
+  * the fact that overloaded methods taking functions as parameters can't accept partial
+  * functions as well. This enables the possibility to directly apply pattern matching
+  * to decompose inputs such as tuples, case classes and collections.
+  *
+  * The following is a small example that showcases how this extensions would work on
+  * a Flink data set:
+  *
+  * {{{
+  *   object Main {
+  *     import org.apache.flink.api.scala.extensions._
+  *     case class Point(x: Double, y: Double)
+  *     def main(args: Array[String]): Unit = {
+  *       val env = ExecutionEnvironment.getExecutionEnvironment
+  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+  *       ds.filterWith {
+  *         case Point(x, _) => x > 1
+  *       }.reduceWith {
+  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
+  *       }.mapWith {
+  *         case Point(x, y) => (x, y)
+  *       }.flatMapWith {
+  *         case (x, y) => Seq('x' -> x, 'y' -> y)
+  *       }.groupingBy {
+  *         case (id, value) => id
+  *       }
+  *     }
+  *   }
+  * }}}
+  *
+  * The extension consists of several implicit conversions over all the data set representations
+  * that could gain from this feature. To use this set of extensions methods the user has to
+  * explicitly opt-in by importing `org.apache.flink.api.scala.extensions.acceptPartialFunctions`.
+  *
+  * For more information and usage examples please consult the Apache Flink official documentation.
+  *
+  */
+package object extensions {
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[T](ds: DataSet[T]): OnDataSet[T] =
+    new OnDataSet[T](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[L, R](
+      ds: JoinFunctionAssigner[L, R]): OnJoinFunctionAssigner[L, R] =
+    new OnJoinFunctionAssigner[L, R](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[L, R](ds: CrossDataSet[L, R]): OnCrossDataSet[L, R] =
+    new OnCrossDataSet[L, R](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[T](ds: GroupedDataSet[T]): OnGroupedDataSet[T] =
+    new OnGroupedDataSet[T](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[L, R](ds: CoGroupDataSet[L, R]): OnCoGroupDataSet[L, R] =
+    new OnCoGroupDataSet[L, R](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[L, R, O](
+      ds: HalfUnfinishedKeyPairOperation[L, R, O]): OnHalfUnfinishedKeyPairOperation[L, R, O] =
+    new OnHalfUnfinishedKeyPairOperation[L, R, O](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[L, R, O](
+      ds: UnfinishedKeyPairOperation[L, R, O]): OnUnfinishedKeyPairOperation[L, R, O] =
+    new OnUnfinishedKeyPairOperation[L, R, O](ds)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
new file mode 100644
index 0000000..c2e13fe
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.base
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.apache.flink.util.TestLogger
+import org.scalatest.junit.JUnitSuiteLike
+
+/**
+  * Common facilities to test the `acceptPartialFunctions` extension
+  */
+private[extensions] abstract class AcceptPFTestBase extends TestLogger with JUnitSuiteLike {
+
+  private val env = ExecutionEnvironment.getExecutionEnvironment
+
+  protected val tuples = env.fromElements(1 -> "hello", 2 -> "world")
+  protected val caseObjects = env.fromElements(KeyValuePair(1, "hello"), KeyValuePair(2, "world"))
+
+  protected val groupedTuples = tuples.groupBy(_._1)
+  protected val groupedCaseObjects = caseObjects.groupBy(_.id)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala
new file mode 100644
index 0000000..6d02393
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.data
+
+/**
+  * Simple case class to test the `acceptPartialFunctions` extension
+  *
+  * @param id A numerical identifier
+  * @param value A textual value
+  */
+private [extensions] case class KeyValuePair(id: Int, value: String)

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
new file mode 100644
index 0000000..a20f977
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.CoGroupOperator
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnCoGroupDataSetTest extends AcceptPFTestBase {
+
+  @Test
+  def testCoGroupProjectingOnTuple(): Unit = {
+    val test =
+      tuples.coGroup(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }.projecting {
+        case ((_, v1) #:: _, (_, v2) #:: _) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+      "projecting on tuples should produce a CoGroupOperator")
+  }
+
+  @Test
+  def testCoGroupProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.coGroup(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }.projecting {
+        case (KeyValuePair(_, v1) #:: _, KeyValuePair(_, v2) #:: _) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+      "projecting on case objects should produce a CoGroupOperator")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
new file mode 100644
index 0000000..650c9ab
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.CrossOperator
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnCrossDataSetTest extends AcceptPFTestBase {
+
+  @Test
+  def testCrossProjectingOnTuple(): Unit = {
+    val test =
+      tuples.cross(tuples).projecting {
+        case ((_, v1), (_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[CrossOperator[_, _, _]],
+      "projecting for cross on tuples should produce a CrossOperator")
+  }
+
+  @Test
+  def testCrossProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.cross(caseObjects).projecting {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[CrossOperator[_, _, _]],
+      "projecting for cross on case objects should produce a CrossOperator")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
new file mode 100644
index 0000000..d4d6244
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnDataSetTest extends AcceptPFTestBase {
+
+  @Test
+  def testMapWithOnTuple(): Unit = {
+    val test =
+      tuples.mapWith {
+        case (id, value) => s"$id $value"
+      }
+    assert(test.javaSet.isInstanceOf[MapOperator[_, _]],
+      "mapWith should produce a MapOperator")
+  }
+
+  @Test
+  def testMapWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.mapWith {
+        case KeyValuePair(id, value) => s"$id $value"
+      }
+    assert(test.javaSet.isInstanceOf[MapOperator[_, _]],
+      "mapWith should produce a MapOperator")
+  }
+
+  @Test
+  def testMapPartitionWithOnTuple(): Unit = {
+    val test =
+      tuples.mapPartitionWith {
+        case (id, value) #:: _ => s"$id $value"
+      }
+    assert(test.javaSet.isInstanceOf[MapPartitionOperator[_, _]],
+      "mapPartitionWith should produce a MapPartitionOperator")
+  }
+
+  @Test
+  def testMapPartitionWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.mapPartitionWith {
+        case KeyValuePair(id, value) #:: _ => s"$id $value"
+      }
+    assert(test.javaSet.isInstanceOf[MapPartitionOperator[_, _]],
+      "mapPartitionWith should produce a MapPartitionOperator")
+  }
+
+  @Test
+  def testFlatMapWithOnTuple(): Unit = {
+    val test =
+      tuples.flatMapWith {
+        case (id, value) => List(id.toString, value)
+      }
+    assert(test.javaSet.isInstanceOf[FlatMapOperator[_, _]],
+      "flatMapWith should produce a FlatMapOperator")
+  }
+
+  @Test
+  def testFlatMapWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.flatMapWith {
+        case KeyValuePair(id, value) => List(id.toString, value)
+      }
+    assert(test.javaSet.isInstanceOf[FlatMapOperator[_, _]],
+      "flatMapWith should produce a FlatMapOperator")
+  }
+
+  @Test
+  def testFilterWithOnTuple(): Unit = {
+    val test =
+      tuples.filterWith {
+        case (id, value) => id == 1
+      }
+    assert(test.javaSet.isInstanceOf[FilterOperator[_]],
+      "filterWith should produce a FilterOperator")
+  }
+
+  @Test
+  def testFilterWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.filterWith {
+        case KeyValuePair(id, value) => id == 1
+      }
+    assert(test.javaSet.isInstanceOf[FilterOperator[_]],
+      "filterWith should produce a FilterOperator")
+  }
+
+  @Test
+  def testReduceWithOnTuple(): Unit = {
+    val test =
+      tuples.reduceWith {
+        case ((_, v1), (_, v2)) => (0, s"$v1 $v2")
+      }
+    assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+      "reduceWith should produce a ReduceOperator")
+  }
+
+  @Test
+  def testReduceWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.reduceWith {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
+      }
+    assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+      "reduceWith should produce a ReduceOperator")
+  }
+
+  @Test
+  def testReduceGroupWithOnTuple(): Unit = {
+    val accumulator: StringBuffer = new StringBuffer()
+    val test =
+      tuples.reduceGroupWith {
+        case (_, value) #:: _ => accumulator.append(value).append('\n')
+      }
+    assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+      "reduceGroupWith should produce a GroupReduceOperator")
+  }
+
+  @Test
+  def testReduceGroupWithOnCaseClass(): Unit = {
+    val accumulator: StringBuffer = new StringBuffer()
+    val test =
+      caseObjects.reduceGroupWith {
+        case KeyValuePair(_, value) #:: _ => accumulator.append(value).append('\n')
+      }
+    assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+      "reduceGroupWith should produce a GroupReduceOperator")
+  }
+
+  @Test
+  def testGroupingByOnTuple(): Unit = {
+    val test =
+      tuples.groupingBy {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[GroupedDataSet[_]],
+      "groupingBy should produce a GroupedDataSet")
+  }
+
+  @Test
+  def testGroupingByOnCaseClass(): Unit = {
+    val test =
+      caseObjects.groupingBy {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[GroupedDataSet[_]],
+      "groupingBy should produce a GroupedDataSet")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
new file mode 100644
index 0000000..898c4b0
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnGroupedDataSetTest extends AcceptPFTestBase {
+
+  @Test
+  def testSortGroupWithOnTuple(): Unit = {
+    val test =
+      groupedTuples.sortGroupWith(Order.ASCENDING) {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[GroupedDataSet[_]],
+      "sortGroupWith on tuples should produce a GroupedDataSet")
+  }
+
+  @Test
+  def testSortGroupWithOnCaseClass(): Unit = {
+    val test =
+      groupedCaseObjects.sortGroupWith(Order.ASCENDING) {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[GroupedDataSet[_]],
+      "sortGroupWith on case objects should produce a GroupedDataSet")
+  }
+
+  @Test
+  def testReduceWithOnTuple(): Unit = {
+    val test =
+      groupedTuples.reduceWith {
+        case ((_, v1), (_, v2)) => (0, s"$v1 $v2")
+      }
+
+    assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+      "reduceWith on tuples should produce a ReduceOperator")
+  }
+
+  @Test
+  def testReduceWithOnCaseClass(): Unit = {
+    val test =
+      groupedCaseObjects.reduceWith {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
+      }
+
+    assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+      "reduceWith on case objects should produce a ReduceOperator")
+  }
+
+  @Test
+  def testReduceGroupWithOnTuple(): Unit = {
+    val accumulator: StringBuffer = new StringBuffer()
+    val test =
+      groupedTuples.reduceGroupWith {
+        case (_, value) #:: _ => accumulator.append(value).append('\n')
+      }
+
+    assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+      "reduceGroupWith on tuples should produce a GroupReduceOperator")
+  }
+
+  @Test
+  def testReduceGroupWithOnCaseClass(): Unit = {
+    val accumulator: StringBuffer = new StringBuffer()
+    val test =
+      groupedCaseObjects.reduceGroupWith {
+        case KeyValuePair(_, value) #:: _ => accumulator.append(value).append('\n')
+      }
+
+    assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+      "reduceGroupWith on case objects should produce a GroupReduceOperator")
+  }
+
+  @Test
+  def testCombineGroupWithOnTuple(): Unit = {
+    val accumulator: StringBuffer = new StringBuffer()
+    val test =
+      groupedTuples.combineGroupWith {
+        case (_, value) #:: _ => accumulator.append(value).append('\n')
+      }
+
+    assert(test.javaSet.isInstanceOf[GroupCombineOperator[_, _]],
+      "combineGroupWith on tuples should produce a GroupCombineOperator")
+  }
+
+  @Test
+  def testCombineGroupWithOnCaseClass(): Unit = {
+    val accumulator: StringBuffer = new StringBuffer()
+    val test =
+      groupedCaseObjects.combineGroupWith {
+        case KeyValuePair(_, value) #:: _ => accumulator.append(value).append('\n')
+      }
+
+    assert(test.javaSet.isInstanceOf[GroupCombineOperator[_, _]],
+      "combineGroupWith on case objects should produce a GroupCombineOperator")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
new file mode 100644
index 0000000..dca2208
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.CoGroupOperator
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnHalfUnfinishedKeyPairOperationTest extends AcceptPFTestBase {
+
+  @Test
+  def testInnerJoinIsEqualToOnTuple(): Unit = {
+    val test =
+      tuples.join(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "isEqualTo for inner join on tuples should produce a EquiJoin")
+  }
+
+  @Test
+  def testInnerJoinIsEqualToOnCaseClass(): Unit = {
+    val test =
+      caseObjects.join(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "isEqualTo for inner join on case objects should produce a EquiJoin")
+  }
+
+  @Test
+  def testRightOuterJoinIsEqualToOnTuple(): Unit = {
+    val test =
+      tuples.rightOuterJoin(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+      "isEqualTo for right outer join on tuples should produce a JoinFunctionAssigner")
+  }
+
+  @Test
+  def testRightOuterJoinIsEqualToOnCaseClass(): Unit = {
+    val test =
+      caseObjects.rightOuterJoin(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+      "isEqualTo for right outer join on case objects should produce a JoinFunctionAssigner")
+  }
+
+  @Test
+  def testLeftOuterJoinIsEqualToOnTuple(): Unit = {
+    val test =
+      tuples.leftOuterJoin(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+      "isEqualTo for left outer join on tuples should produce a JoinFunctionAssigner")
+  }
+
+  @Test
+  def testLeftOuterJoinIsEqualToOnCaseClass(): Unit = {
+    val test =
+      caseObjects.leftOuterJoin(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+      "isEqualTo for left outer join on case objects should produce a JoinFunctionAssigner")
+  }
+
+  @Test
+  def testFullOuterJoinIsEqualToOnTuple(): Unit = {
+    val test =
+      tuples.fullOuterJoin(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+      "isEqualTo for full outer join on tuples should produce a JoinFunctionAssigner")
+  }
+
+  @Test
+  def testFullOuterJoinIsEqualToOnCaseClass(): Unit = {
+    val test =
+      caseObjects.fullOuterJoin(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+      "isEqualTo for full outer join on case objects should produce a JoinFunctionAssigner")
+  }
+
+  @Test
+  def testCoGroupIsEqualToOnTuple(): Unit = {
+    val test =
+      tuples.coGroup(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }
+    assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+      "isEqualTo for co-group on tuples should produce a CoGroupOperator")
+  }
+
+  @Test
+  def testCoGroupIsEqualToOnCaseClass(): Unit = {
+    val test =
+      caseObjects.coGroup(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+      "isEqualTo for co-group on case objects should produce a CoGroupOperator")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
new file mode 100644
index 0000000..52e31ae
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnJoinFunctionAssignerTest extends AcceptPFTestBase {
+
+  @Test
+  def testInnerJoinProjectingOnTuple(): Unit = {
+    val test =
+      tuples.join(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }.projecting {
+        case ((_, v1), (_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting inner join on tuples should produce a EquiJoin")
+  }
+
+  @Test
+  def testInnerJoinProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.join(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }.projecting {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting inner join on case objects should produce a EquiJoin")
+  }
+
+  @Test
+  def testRightOuterJoinProjectingOnTuple(): Unit = {
+    val test =
+      tuples.rightOuterJoin(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }.projecting {
+        case ((_, v1), (_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting right outer join on tuples should produce a EquiJoin")
+  }
+
+  @Test
+  def testRightOuterJoinProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.rightOuterJoin(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }.projecting {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting right outer join on case objects should produce a EquiJoin")
+  }
+
+  @Test
+  def testLeftOuterJoinProjectingOnTuple(): Unit = {
+    val test =
+      tuples.leftOuterJoin(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }.projecting {
+        case ((_, v1), (_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting left outer join on tuples should produce a EquiJoin")
+  }
+
+  @Test
+  def testLeftOuterJoinProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.leftOuterJoin(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }.projecting {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting left outer join on case objects should produce a EquiJoin")
+  }
+
+  @Test
+  def testFullOuterJoinProjectingOnTuple(): Unit = {
+    val test =
+      tuples.fullOuterJoin(tuples).whereClause {
+        case (id, _) => id
+      }.isEqualTo {
+        case (id, _) => id
+      }.projecting {
+        case ((_, v1), (_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting full outer join on tuples should produce a EquiJoin")
+  }
+
+  @Test
+  def testFullOuterJoinProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.fullOuterJoin(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }.isEqualTo {
+        case KeyValuePair(id, _) => id
+      }.projecting {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+      }
+    assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+      "projecting full outer join on case objects should produce a EquiJoin")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
new file mode 100644
index 0000000..b454699
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnUnfinishedKeyPairOperationTest extends AcceptPFTestBase {
+
+  @Test
+  def testInnerJoinWhereClauseOnTuple(): Unit = {
+    val test =
+      tuples.join(tuples).whereClause {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for inner join on tuples should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testInnerJoinWhereClauseOnCaseClass(): Unit = {
+    val test =
+      caseObjects.join(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for inner join on case objects " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testRightOuterJoinWhereClauseOnTuple(): Unit = {
+    val test =
+      tuples.join(tuples).whereClause {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for right outer join on tuples " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testRightOuterJoinWhereClauseOnCaseClass(): Unit = {
+    val test =
+      caseObjects.join(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for right outer join on case objects " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testLeftOuterJoinWhereClauseOnTuple(): Unit = {
+    val test =
+      tuples.join(tuples).whereClause {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for left outer join on tuples " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testLeftOuterJoinWhereClauseOnCaseClass(): Unit = {
+    val test =
+      caseObjects.join(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for left outer join on case objects " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testFullOuterJoinWhereClauseOnTuple(): Unit = {
+    val test =
+      tuples.fullOuterJoin(tuples).whereClause {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for full outer join on tuples " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testFullOuterJoinWhereClauseOnCaseClass(): Unit = {
+    val test =
+      caseObjects.fullOuterJoin(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for full outer join on case objects " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testCoGroupWhereClauseOnTuple(): Unit = {
+    val test =
+      tuples.coGroup(tuples).whereClause {
+        case (id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for co-group on tuples " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+  @Test
+  def testCoGroupWhereClauseOnCaseClass(): Unit = {
+    val test =
+      caseObjects.coGroup(caseObjects).whereClause {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+      "whereClause for co-group on case objects " +
+        "should produce a HalfUnfinishedKeyPairOperation")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 669f12e..141625e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -18,14 +18,13 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{PublicEvolving, Internal, Public}
+import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, SingleOutputStreamOperator, KeyedStream}
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation
 import org.apache.flink.util.Collector
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 4d09dae..93b5cc8 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -73,7 +73,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
     * A join operation that has a [[KeySelector]] defined for the first input.
     *
     * You need to specify a [[KeySelector]] for the second input using [[equalTo()]]
-    * before you can proceeed with specifying a [[WindowAssigner]] using [[EqualTo.window()]].
+    * before you can proceed with specifying a [[WindowAssigner]] using [[EqualTo.window()]].
     *
     * @tparam KEY Type of the key. This must be the same for both inputs
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
new file mode 100644
index 0000000..deb03a3
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream}
+
+/**
+  * Wraps a connected data stream, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param stream The wrapped data stream
+  * @tparam IN1 The type of the data stream items coming from the first connection
+  * @tparam IN2 The type of the data stream items coming from the second connection
+  */
+class OnConnectedStream[IN1, IN2](stream: ConnectedStreams[IN1, IN2]) {
+
+  /**
+    * Applies a CoMap transformation on the connected streams.
+    *
+    * The transformation consists of two separate functions, where
+    * the first one is called for each element of the first connected stream,
+    * and the second one is called for each element of the second connected stream.
+    *
+    * @param map1 Function called per element of the first input.
+    * @param map2 Function called per element of the second input.
+    * @return The resulting data stream.
+    */
+  @PublicEvolving
+  def mapWith[R: TypeInformation](map1: IN1 => R, map2: IN2 => R): DataStream[R] =
+    stream.map(map1, map2)
+
+  /**
+    * Applies a CoFlatMap transformation on the connected streams.
+    *
+    * The transformation consists of two separate functions, where
+    * the first one is called for each element of the first connected stream,
+    * and the second one is called for each element of the second connected stream.
+    *
+    * @param flatMap1 Function called per element of the first input.
+    * @param flatMap2 Function called per element of the second input.
+    * @return The resulting data stream.
+    */
+  @PublicEvolving
+  def flatMapWith[R: TypeInformation](
+      flatMap1: IN1 => TraversableOnce[R], flatMap2: IN2 => TraversableOnce[R]): DataStream[R] =
+    stream.flatMap(flatMap1, flatMap2)
+
+  /**
+    * Keys the two connected streams together. After this operation, all
+    * elements with the same key from both streams will be sent to the
+    * same parallel instance of the transformation functions.
+    *
+    * @param key1 The first stream's key function
+    * @param key2 The second stream's key function
+    * @return The key-grouped connected streams
+    */
+  @PublicEvolving
+  def keyingBy[K1: TypeInformation, K2: TypeInformation](key1: IN1 => K1, key2: IN2 => K2):
+      ConnectedStreams[IN1, IN2] =
+    stream.keyBy(key1, key2)
+
+}


Mime
View raw message