flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [07/27] incubator-flink git commit: [scala] [streaming] Windowing functionality added to scala api
Date Sun, 04 Jan 2015 20:50:57 GMT
[scala] [streaming] Windowing functionality added to scala api


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/80393c4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/80393c4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/80393c4a

Branch: refs/heads/master
Commit: 80393c4a1901e2672349b76cb7b3476dcb674edb
Parents: de06d95
Author: Gyula Fora <gyfora@apache.org>
Authored: Mon Dec 15 16:21:00 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |  54 +++++
 .../flink/api/scala/streaming/DataStream.scala  |  27 +++
 .../streaming/StreamExecutionEnvironment.scala  |  14 +-
 .../scala/streaming/WindowedDataStream.scala    | 232 +++++++++++++++++++
 4 files changed, 326 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index cb9cd04..287f29d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -260,6 +260,30 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
+	 * Applies a reduceGroup transformation on the windowed data stream by
+	 * reducing the current window at every trigger. In contrast with the
+	 * standard binary reducer, with reduceGroup the user can access all
+	 * elements of the window at the same time through the iterable interface.
+	 * The user can also extend the {@link RichGroupReduceFunction} to gain
+	 * access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * </br> </br> This version of reduceGroup uses user supplied
+	 * typeinformation for serializaton. Use this only when the system is unable
+	 * to detect type information using:
+	 * {@link #reduceGroup(GroupReduceFunction)}
+	 * 
+	 * @param reduceFunction
+	 *            The reduce function that will be applied to the windows.
+	 * @return The transformed DataStream
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
+			GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) {
+
+		return dataStream.transform("NextGenWindowReduce", outType,
+				getReduceGroupInvokable(reduceFunction));
+	}
+
+	/**
 	 * Applies an aggregation that sums every window of the data stream at the
 	 * given position.
 	 * 
@@ -335,6 +359,19 @@ public class WindowedDataStream<OUT> {
 	/**
 	 * Applies an aggregation that gives the minimum element of every window of
 	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
 	 * minimum value the operator returns either the first or last one depending
 	 * on the parameter setting.
 	 * 
@@ -418,6 +455,19 @@ public class WindowedDataStream<OUT> {
 	/**
 	 * Applies an aggregation that gives the maximum element of every window of
 	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
 	 * maximum value the operator returns either the first or last one depending
 	 * on the parameter setting.
 	 * 
@@ -598,6 +648,10 @@ public class WindowedDataStream<OUT> {
 		return dataStream.getType();
 	}
 
+	public DataStream<OUT> getDataStream() {
+		return dataStream;
+	}
+
 	protected WindowedDataStream<OUT> copy() {
 		return new WindowedDataStream<OUT>(this);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index e96f5eb..69b8359 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -38,6 +38,10 @@ import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.streaming.api.function.sink.SinkFunction
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy
+import scala.collection.JavaConversions._
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -397,6 +401,29 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Create a WindowedDataStream that can be used to apply
+   * transformation like .reduce(...) or aggregations on
+   * preset chunks(windows) of the data stream. To define the windows one or
+   * more WindowingHelper-s such as Time, Count and
+   * Delta can be used.</br></br> When applied to a grouped data
+   * stream, the windows (evictions) and slide sizes (triggers) will be
+   * computed on a per group basis. </br></br> For more advanced control over
+   * the trigger and eviction policies please use to
+   * window(List(triggers), List(evicters))
+   */
+  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(windowingHelper:
_*))
+
+  /**
+   * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
+   * Windowing can be used to apply transformation like .reduce(...) or aggregations on
+   * preset chunks(windows) of the data stream.</br></br>For most common
+   * use-cases please refer to window(WindowingHelper[_]*)
+   *
+   */
+  def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): WindowedDataStream[T]
= new WindowedDataStream[T](javaStream.window(triggers, evicters))
+
+  /**
+   * >>>>>>> 12178aa... [scala] [streaming] Windowing functionality
added to scala api
    * Writes a DataStream to the standard output stream (stdout). For each
    * element of the DataStream the result of .toString is
    * written.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index e4a7b48..dadfde2 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.streaming
 
+import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment => JavaEnv
}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.commons.lang.Validate
@@ -26,6 +27,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource
 import org.apache.flink.streaming.api.invokable.SourceInvokable
 import org.apache.flink.streaming.api.function.source.FromElementsFunction
 import org.apache.flink.streaming.api.function.source.SourceFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.util.Collector
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -113,7 +116,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Creates a new DataStream that contains a sequence of numbers.
    *
    */
-  def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new DataStream(javaEnv.generateSequence(from,
to))
+  def generateSequence(from: Long, to: Long): DataStream[Long] = {
+    val source = new SourceFunction[Long] {
+      override def invoke(out: Collector[Long]) = {
+        for (i <- from.to(to)) {
+          out.collect(i)
+        }
+      }
+    }
+    addSource(source)
+  }
 
   /**
    * Creates a DataStream that contains the given elements. The elements must all be of the

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
new file mode 100644
index 0000000..ff89a47
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.streaming.api.invokable.StreamInvokable
+import scala.collection.JavaConversions._
+
+class WindowedDataStream[T](javaStream: JavaWStream[T]) {
+
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F =
{
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
+
+  /**
+   * Defines the slide size (trigger frequency) for the windowed data stream.
+   * This controls how often the user defined function will be triggered on
+   * the window.
+   */
+  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.every(windowingHelper:
_*))
+
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * field positions. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy(fields: Int*): WindowedDataStream[T] =
+    new WindowedDataStream[T](javaStream.groupBy(fields: _*))
+
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * field expressions. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
+    new WindowedDataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*))
+
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * KeySelector function. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    new WindowedDataStream[T](javaStream.groupBy(keyExtractor))
+  }
+
+  /**
+   * Applies a reduce transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    new DataStream[T](javaStream.reduce(reducer))
+  }
+
+  /**
+   * Applies a reduce transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Applies a reduceGroup transformation on the windowed data stream by reducing
+   * the current window at every trigger. In contrast with the simple binary reduce operator,
groupReduce exposes the whole window through the Iterable interface.
+   * </br>
+   * </br>
+   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+   */
+  def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): DataStream[R]
= {
+    if (reducer == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    new DataStream[R](javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]]))
+  }
+
+  /**
+   * Applies a reduceGroup transformation on the windowed data stream by reducing
+   * the current window at every trigger. In contrast with the simple binary reduce operator,
groupReduce exposes the whole window through the Iterable interface.
+   * </br>
+   * </br>
+   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+   */
+  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    val reducer = new GroupReduceFunction[T, R] {
+      val cleanFun = clean(fun)
+      def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) }
+    }
+    reduceGroup(reducer)
+  }
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the window at
+   * the given position.
+   *
+   */
+  def max(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.max(field))
+    case field: String => return new DataStream[T](javaStream.max(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the window at
+   * the given position.
+   *
+   */
+  def min(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.min(field))
+    case field: String => return new DataStream[T](javaStream.min(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given position.
+   *
+   */
+  def sum(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.sum(field))
+    case field: String => return new DataStream[T](javaStream.sum(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given position. When equality, returns the first.
+   *
+   */
+  def maxBy(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.maxBy(field))
+    case field: String => return new DataStream[T](javaStream.maxBy(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given position. When equality, returns the first.
+   *
+   */
+  def minBy(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.minBy(field))
+    case field: String => return new DataStream[T](javaStream.minBy(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given position. When equality, the user can set to get the first or last element
with the minimal value.
+   *
+   */
+  def minBy(field: Any, first: Boolean): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.minBy(field, first))
+    case field: String => return new DataStream[T](javaStream.minBy(field, first))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given position. When equality, the user can set to get the first or last element
with the maximal value.
+   *
+   */
+  def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
+    case field: String => return new DataStream[T](javaStream.maxBy(field, first))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+}
\ No newline at end of file


Mime
View raw message