flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup
Date Mon, 29 Feb 2016 19:27:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9580b8fe5 -> 0ac2b1a7b


[FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup

This enforces that the user always has to specify keys for both inputs
before .window() can be called.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac2b1a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac2b1a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac2b1a7

Branch: refs/heads/master
Commit: 0ac2b1a7b4b44d0e5722532958e5bda00615dbb4
Parents: 9580b8f
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Feb 29 17:02:38 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 29 20:26:14 2016 +0100

----------------------------------------------------------------------
 .../scala/examples/join/WindowJoin.scala        |   4 +-
 .../streaming/api/scala/CoGroupedStreams.scala  | 326 +++++++----------
 .../flink/streaming/api/scala/DataStream.scala  |   8 +-
 .../streaming/api/scala/JoinedStreams.scala     | 357 +++++++------------
 .../StreamingScalaAPICompletenessTest.scala     |   6 +-
 5 files changed, 261 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 81f12dc..50a2216 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.scala.examples.join
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
@@ -58,7 +56,7 @@ object WindowJoin {
     val joined = grades.join(salaries)
         .where(_.name)
         .equalTo(_.name)
-        .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
+        .window(SlidingTimeWindows.of(Time.seconds(2), Time.seconds(1)))
         .apply { (g, s) => Person(g.name, g.grade, s.salary) }
 
     if (params.has("output")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index f4ab2ee..4cce9e2 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -56,252 +56,164 @@ import scala.collection.JavaConverters._
  * } }}}
  */
 @Public
-object CoGroupedStreams {
+class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
 
   /**
-   * A co-group operation that does not yet have its [[KeySelector]]s defined.
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
+   * Specifies a [[KeySelector]] for elements from the first input.
    */
-  class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the first input.
-     */
-    def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val keyType = implicitly[TypeInformation[KEY]]
-      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T1) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = keyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
-    }
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the second input.
-     */
-    def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] =
{
-      val cleanFun = clean(keySelector)
-      val keyType = implicitly[TypeInformation[KEY]]
-      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T2) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = keyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure
cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
+  def where[KEY: TypeInformation](keySelector: T1 => KEY): Where[KEY] = {
+    val cleanFun = clean(keySelector)
+    val keyType = implicitly[TypeInformation[KEY]]
+    val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
+      def getKey(in: T1) = cleanFun(in)
+      override def getProducedType: TypeInformation[KEY] = keyType
     }
+    new Where[KEY](javaSelector, keyType)
   }
 
   /**
-   * A co-group operation that has [[KeySelector]]s defined for either both or
-   * one input.
+   * A co-group operation that has [[KeySelector]]s defined for the first input.
    *
-   * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
-   * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
+   * You need to specify a [[KeySelector]] for the second input using [[equalTo()]]
+   * before you can proceeed with specifying a [[WindowAssigner]] using [[EqualTo.window()]].
    *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
    * @tparam KEY Type of the key. This must be the same for both inputs
    */
-  class WithKey[T1, T2, KEY](
-      input1: DataStream[T1],
-      input2: DataStream[T2],
-      keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY],
-      keyType: TypeInformation[KEY]) {
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the first input.
-     */
-    def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val localKeyType = keyType
-      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T1) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = localKeyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, keyType)
-    }
+  class Where[KEY](keySelector1: KeySelector[T1, KEY], keyType: TypeInformation[KEY]) {
 
     /**
      * Specifies a [[KeySelector]] for elements from the second input.
      */
-    def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+    def equalTo(keySelector: T2 => KEY): EqualTo = {
       val cleanFun = clean(keySelector)
       val localKeyType = keyType
       val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
         override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, keyType)
-    }
-
-    /**
-     * Specifies the window on which the co-group operation works.
-     */
-    @PublicEvolving
-    def window[W <: Window](
-        assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
-        : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
-      if (keySelector1 == null || keySelector2 == null) {
-        throw new UnsupportedOperationException("You first need to specify KeySelectors for
both" +
-          "inputs using where() and equalTo().")
-      }
-      new CoGroupedStreams.WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        clean(assigner),
-        null,
-        null)
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure
cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
-    }
-  }
-
-  /**
-   * A co-group operation that has [[KeySelector]]s defined for both inputs as
-   * well as a [[WindowAssigner]].
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
-   * @tparam KEY Type of the key. This must be the same for both inputs
-   * @tparam W Type of { @link Window} on which the co-group operation works.
-   */
-  @PublicEvolving
-  class WithWindow[T1, T2, KEY, W <: Window](
-      input1: DataStream[T1],
-      input2: DataStream[T2],
-      keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY],
-      windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
-      trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
-      evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
-
-
-    /**
-     * Sets the [[Trigger]] that should be used to trigger window emission.
-     */
-    @PublicEvolving
-    def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >:
W])
-    : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
-      new WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        windowAssigner,
-        newTrigger,
-        evictor)
+      new EqualTo(javaSelector)
     }
 
     /**
-     * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
+     * A co-group operation that a [[KeySelector]] defined for the first and the second input.
      *
-     * Note: When using an evictor window performance will degrade significantly, since
-     * pre-aggregation of window results cannot be used.
+     * A window can now be specified using [[window()]].
      */
-    @PublicEvolving
-    def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >:
W])
-    : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
-      new WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        windowAssigner,
-        trigger,
-        newEvictor)
-    }
-
-    /**
-     * Completes the co-group operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[O: TypeInformation](
-        fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
-      require(fun != null, "CoGroup function must not be null.")
-
-      val coGrouper = new CoGroupFunction[T1, T2, O] {
-        val cleanFun = clean(fun)
-        def coGroup(
-            left: java.lang.Iterable[T1],
-            right: java.lang.Iterable[T2], out: Collector[O]) = {
-          out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala))
+    class EqualTo(keySelector2: KeySelector[T2, KEY]) {
+
+      /**
+       * Specifies the window on which the co-group operation works.
+       */
+      @PublicEvolving
+      def window[W <: Window](
+          assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
+      : WithWindow[W] = {
+        if (keySelector1 == null || keySelector2 == null) {
+          throw new UnsupportedOperationException(
+            "You first need to specify KeySelectors for both inputs using where() and equalTo().")
         }
+        new WithWindow[W](clean(assigner), null, null)
       }
-      apply(coGrouper)
-    }
 
-    /**
-     * Completes the co-group operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[O: TypeInformation](
-        fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
-      require(fun != null, "CoGroup function must not be null.")
+      /**
+       * A co-group operation that has [[KeySelector]]s defined for both inputs as
+       * well as a [[WindowAssigner]].
+       *
+       * @tparam W Type of { @link Window} on which the co-group operation works.
+       */
+      @PublicEvolving
+      class WithWindow[W <: Window](
+          windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2],
W],
+          trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+          evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
{
+
+        /**
+         * Sets the [[Trigger]] that should be used to trigger window emission.
+         */
+        @PublicEvolving
+        def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2],
_ >: W])
+            : WithWindow[W] = {
+          new WithWindow[W](windowAssigner, newTrigger, evictor)
+        }
 
-      val coGrouper = new CoGroupFunction[T1, T2, O] {
-        val cleanFun = clean(fun)
-        def coGroup(
-            left: java.lang.Iterable[T1],
-            right: java.lang.Iterable[T2], out: Collector[O]) = {
-          cleanFun(left.iterator.asScala, right.iterator.asScala, out)
+        /**
+         * Sets the [[Evictor]] that should be used to evict elements from a window before
+         * emission.
+         *
+         * Note: When using an evictor window performance will degrade significantly, since
+         * pre-aggregation of window results cannot be used.
+         */
+        @PublicEvolving
+        def evictor(
+            newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >:
W])
+            : WithWindow[W] = {
+          new WithWindow[W](windowAssigner, trigger, newEvictor)
         }
-      }
-      apply(coGrouper)
-    }
 
-    /**
-     * Completes the co-group operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] =
{
+        /**
+         * Completes the co-group operation with the user function that is executed
+         * for windowed groups.
+         */
+        def apply[O: TypeInformation](
+            fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
+          require(fun != null, "CoGroup function must not be null.")
+
+          val coGrouper = new CoGroupFunction[T1, T2, O] {
+            val cleanFun = clean(fun)
+            def coGroup(
+                left: java.lang.Iterable[T1],
+                right: java.lang.Iterable[T2], out: Collector[O]) = {
+              out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala))
+            }
+          }
+          apply(coGrouper)
+        }
 
-      val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream)
+        /**
+         * Completes the co-group operation with the user function that is executed
+         * for windowed groups.
+         */
+        def apply[O: TypeInformation](
+            fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] =
{
+          require(fun != null, "CoGroup function must not be null.")
+
+          val coGrouper = new CoGroupFunction[T1, T2, O] {
+            val cleanFun = clean(fun)
+            def coGroup(
+                left: java.lang.Iterable[T1],
+                right: java.lang.Iterable[T2], out: Collector[O]) = {
+              cleanFun(left.iterator.asScala, right.iterator.asScala, out)
+            }
+          }
+          apply(coGrouper)
+        }
 
-      asScalaStream(coGroup
-        .where(keySelector1)
-        .equalTo(keySelector2)
-        .window(windowAssigner)
-        .trigger(trigger)
-        .evictor(evictor)
-        .apply(clean(function), implicitly[TypeInformation[T]]))
-    }
+        /**
+         * Completes the co-group operation with the user function that is executed
+         * for windowed groups.
+         */
+        def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T]
= {
+
+          val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream)
+
+          asScalaStream(coGroup
+            .where(keySelector1)
+            .equalTo(keySelector2)
+            .window(windowAssigner)
+            .trigger(trigger)
+            .evictor(evictor)
+            .apply(clean(function), implicitly[TypeInformation[T]]))
+        }
+      }
 
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure
cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 
-
   /**
-   * Creates a new co-group operation from the two given inputs.
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
    */
-  def createCoGroup[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
-      : CoGroupedStreams.Unspecified[T1, T2] = {
-    new CoGroupedStreams.Unspecified[T1, T2](input1, input2)
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
   }
-
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 9c0675f..5e9c307 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -780,16 +780,16 @@ class DataStream[T](stream: JavaStream[T]) {
    * Creates a co-group operation. See [[CoGroupedStreams]] for an example of how the keys
    * and window can be specified.
    */
-  def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams.Unspecified[T, T2] = {
-    CoGroupedStreams.createCoGroup(this, otherStream)
+  def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] = {
+    new CoGroupedStreams(this, otherStream)
   }
 
   /**
    * Creates a join operation. See [[JoinedStreams]] for an example of how the keys
    * and window can be specified.
    */
-  def join[T2](otherStream: DataStream[T2]): JoinedStreams.Unspecified[T, T2] = {
-    JoinedStreams.createJoin(this, otherStream)
+  def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] = {
+    new JoinedStreams(this, otherStream)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 381a8cb..d2fb013 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -54,262 +54,171 @@ import org.apache.flink.util.Collector
  * } }}}
  */
 @Public
-object JoinedStreams {
+class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
 
   /**
-   * A join operation that does not yet have its [[KeySelector]]s defined.
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
+   * Specifies a [[KeySelector]] for elements from the first input.
    */
-  class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the first input.
-     */
-    def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val keyType = implicitly[TypeInformation[KEY]]
-      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T1) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = keyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
-    }
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the second input.
-     */
-    def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] =
{
-      val cleanFun = clean(keySelector)
-      val keyType = implicitly[TypeInformation[KEY]]
-      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T2) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = keyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure
cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
+  def where[KEY: TypeInformation](keySelector: T1 => KEY): Where[KEY] = {
+    val cleanFun = clean(keySelector)
+    val keyType = implicitly[TypeInformation[KEY]]
+    val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
+      def getKey(in: T1) = cleanFun(in)
+      override def getProducedType: TypeInformation[KEY] = keyType
     }
+    new Where[KEY](javaSelector, keyType)
   }
 
   /**
-   * A join operation that has [[KeySelector]]s defined for either both or
-   * one input.
-   *
-   * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
-   * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
-   * @tparam KEY Type of the key. This must be the same for both inputs
-   */
-  class WithKey[T1, T2, KEY](
-      input1: DataStream[T1],
-      input2: DataStream[T2],
-      keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY],
-      keyType: TypeInformation[KEY]) {
+    * A join operation that has a [[KeySelector]] defined for the first input.
+    *
+    * You need to specify a [[KeySelector]] for the second input using [[equalTo()]]
+    * before you can proceeed with specifying a [[WindowAssigner]] using [[EqualTo.window()]].
+    *
+    * @tparam KEY Type of the key. This must be the same for both inputs
+    */
+  class Where[KEY](keySelector1: KeySelector[T1, KEY], keyType: TypeInformation[KEY]) {
 
     /**
-     * Specifies a [[KeySelector]] for elements from the first input.
-     */
-    def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val localKeyType = keyType
-      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T1) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = localKeyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, localKeyType)
-    }
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the second input.
-     */
-    def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+      * Specifies a [[KeySelector]] for elements from the second input.
+      */
+    def equalTo(keySelector: T2 => KEY): EqualTo = {
       val cleanFun = clean(keySelector)
       val localKeyType = keyType
       val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
         override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, localKeyType)
+      new EqualTo(javaSelector)
     }
 
     /**
-     * Specifies the window on which the join operation works.
-     */
-    @PublicEvolving
-    def window[W <: Window](
-        assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
-        : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
-      if (keySelector1 == null || keySelector2 == null) {
-        throw new UnsupportedOperationException("You first need to specify KeySelectors for
both" +
-          "inputs using where() and equalTo().")
-      }
-      new JoinedStreams.WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        clean(assigner),
-        null,
-        null)
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure
cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
-    }
-  }
-
-  /**
-   * A join operation that has [[KeySelector]]s defined for both inputs as
-   * well as a [[WindowAssigner]].
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
-   * @tparam KEY Type of the key. This must be the same for both inputs
-   * @tparam W Type of { @link Window} on which the join operation works.
-   */
-  class WithWindow[T1, T2, KEY, W <: Window](
-      input1: DataStream[T1],
-      input2: DataStream[T2],
-      keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY],
-      windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
-      trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
-      evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+      * A join operation that has a [[KeySelector]] defined for the first and the second
input.
+      *
+      * A window can now be specified using [[window()]].
+      */
+    class EqualTo(keySelector2: KeySelector[T2, KEY]) {
+      /**
+        * Specifies the window on which the join operation works.
+        */
+      @PublicEvolving
+      def window[W <: Window](
+          assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
+          : WithWindow[W] = {
+        if (keySelector1 == null || keySelector2 == null) {
+          throw new UnsupportedOperationException(
+            "You first need to specify KeySelectors for both inputs using where() and equalTo().")
+        }
 
+        new WithWindow[W](clean(assigner), null, null)
+      }
 
-    /**
-     * Sets the [[Trigger]] that should be used to trigger window emission.
-     */
-    @PublicEvolving
-    def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >:
W])
-    : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
-      new WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        windowAssigner,
-        newTrigger,
-        evictor)
-    }
+      /**
+       * A join operation that has [[KeySelector]]s defined for both inputs as
+       * well as a [[WindowAssigner]].
+       *
+       * @tparam W Type of { @link Window} on which the join operation works.
+       */
+      class WithWindow[W <: Window](
+          windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2],
W],
+          trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+          evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
{
+
+        /**
+         * Sets the [[Trigger]] that should be used to trigger window emission.
+         */
+        @PublicEvolving
+        def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2],
_ >: W])
+        : WithWindow[W] = {
+          new WithWindow[W](windowAssigner, newTrigger, evictor)
+        }
 
-    /**
-     * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
-     *
-     * Note: When using an evictor window performance will degrade significantly, since
-     * pre-aggregation of window results cannot be used.
-     */
-    @PublicEvolving
-    def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >:
W])
-    : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
-      new WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        windowAssigner,
-        trigger,
-        newEvictor)
-    }
+        /**
+         * Sets the [[Evictor]] that should be used to evict elements from a window before
emission.
+         *
+         * Note: When using an evictor window performance will degrade significantly, since
+         * pre-aggregation of window results cannot be used.
+         */
+        @PublicEvolving
+        def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2],
_ >: W])
+        : WithWindow[W] = {
+          new WithWindow[W](windowAssigner, trigger, newEvictor)
+        }
 
-    /**
-     * Completes the join operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[O: TypeInformation](fun: (T1, T2) => O): DataStream[O] = {
-      require(fun != null, "Join function must not be null.")
+        /**
+         * Completes the join operation with the user function that is executed
+         * for windowed groups.
+         */
+        def apply[O: TypeInformation](fun: (T1, T2) => O): DataStream[O] = {
+          require(fun != null, "Join function must not be null.")
+
+          val joiner = new FlatJoinFunction[T1, T2, O] {
+            val cleanFun = clean(fun)
+            def join(left: T1, right: T2, out: Collector[O]) = {
+              out.collect(cleanFun(left, right))
+            }
+          }
+          apply(joiner)
+        }
 
-      val joiner = new FlatJoinFunction[T1, T2, O] {
-        val cleanFun = clean(fun)
-        def join(left: T1, right: T2, out: Collector[O]) = {
-          out.collect(cleanFun(left, right))
+        /**
+         * Completes the join operation with the user function that is executed
+         * for windowed groups.
+         */
+        def apply[O: TypeInformation](fun: (T1, T2, Collector[O]) => Unit): DataStream[O]
= {
+          require(fun != null, "Join function must not be null.")
+
+          val joiner = new FlatJoinFunction[T1, T2, O] {
+            val cleanFun = clean(fun)
+            def join(left: T1, right: T2, out: Collector[O]) = {
+              cleanFun(left, right, out)
+            }
+          }
+          apply(joiner)
         }
-      }
-      apply(joiner)
-    }
 
-    /**
-     * Completes the join operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[O: TypeInformation](fun: (T1, T2, Collector[O]) => Unit): DataStream[O]
= {
-      require(fun != null, "Join function must not be null.")
+        /**
+         * Completes the join operation with the user function that is executed
+         * for windowed groups.
+         */
+        def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] =
{
+
+          val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
+
+          asScalaStream(join
+            .where(keySelector1)
+            .equalTo(keySelector2)
+            .window(windowAssigner)
+            .trigger(trigger)
+            .evictor(evictor)
+            .apply(clean(function), implicitly[TypeInformation[T]]))
+        }
 
-      val joiner = new FlatJoinFunction[T1, T2, O] {
-        val cleanFun = clean(fun)
-        def join(left: T1, right: T2, out: Collector[O]) = {
-          cleanFun(left, right, out)
+        /**
+         * Completes the join operation with the user function that is executed
+         * for windowed groups.
+         */
+        def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T]
= {
+
+          val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
+
+          asScalaStream(join
+            .where(keySelector1)
+            .equalTo(keySelector2)
+            .window(windowAssigner)
+            .trigger(trigger)
+            .evictor(evictor)
+            .apply(clean(function), implicitly[TypeInformation[T]]))
         }
       }
-      apply(joiner)
-    }
-
-    /**
-     * Completes the join operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = {
-
-      val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
-
-      asScalaStream(join
-        .where(keySelector1)
-        .equalTo(keySelector2)
-        .window(windowAssigner)
-        .trigger(trigger)
-        .evictor(evictor)
-        .apply(clean(function), implicitly[TypeInformation[T]]))
-    }
-
-    /**
-     * Completes the join operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] =
{
-
-      val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
-
-      asScalaStream(join
-        .where(keySelector1)
-        .equalTo(keySelector2)
-        .window(windowAssigner)
-        .trigger(trigger)
-        .evictor(evictor)
-        .apply(clean(function), implicitly[TypeInformation[T]]))
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure
cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 
-
   /**
-   * Creates a new join operation from the two given inputs.
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
    */
-  def createJoin[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
-      : JoinedStreams.Unspecified[T1, T2] = {
-    new JoinedStreams.Unspecified[T1, T2](input1, input2)
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
   }
-
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 415f057..e93b27b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -21,6 +21,8 @@ import java.lang.reflect.Method
 
 import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+import org.apache.flink.streaming.api.scala.JoinedStreams
+import org.apache.flink.streaming.api.scala.CoGroupedStreams
 
 import scala.language.existentials
 
@@ -139,11 +141,11 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase
{
     checkMethods(
       "JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
       classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
-      classOf[JoinedStreams.WithWindow[_,_,_,_]])
+      classOf[JoinedStreams[_,_]#Where[_]#EqualTo#WithWindow[_]])
 
     checkMethods(
       "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
       classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
-      classOf[CoGroupedStreams.WithWindow[_,_,_,_]])
+      classOf[CoGroupedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
   }
 }


Mime
View raw message