flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [08/27] incubator-flink git commit: [scala] [streaming] Extended scala data stream functionality to include simple operators
Date Sun, 04 Jan 2015 20:50:58 GMT
[scala] [streaming] Extended scala data stream functionality to include simple operators


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c123e11a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c123e11a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c123e11a

Branch: refs/heads/master
Commit: c123e11a01a801419f2ed53814b22d0ad638f98c
Parents: 34353f6
Author: Gyula Fora <gyfora@apache.org>
Authored: Fri Dec 12 00:12:49 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  28 +++
 .../api/datastream/GroupedDataStream.java       |   4 +
 .../flink/api/scala/streaming/DataStream.scala  | 233 ++++++++++++++++++-
 3 files changed, 257 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c123e11a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 04929c1..1cf8d72 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -677,6 +677,20 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
+	
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position, if more elements have the minimum
+	 * value at the given position, the operator returns the first one by
+	 * default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
 
 	/**
 	 * Applies an aggregation that that gives the current element with the
@@ -710,6 +724,20 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
+	
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position, if more elements have the maximum
+	 * value at the given position, the operator returns the first one by
+	 * default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
 
 	/**
 	 * Applies an aggregation that that gives the current element with the

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c123e11a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a2c0f89..18b4b75 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -47,6 +47,10 @@ public class GroupedDataStream<OUT> extends DataStream<OUT>
{
 		this.keySelector = dataStream.keySelector;
 	}
 
+	public KeySelector<OUT, ?> getKeySelector() {
+		return this.keySelector;
+	}
+
 	/**
 	 * Applies a reduce transformation on the grouped data stream grouped on by
 	 * the given key position. The {@link ReduceFunction} will receive input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c123e11a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 711ce7c..b10bdc6 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -17,15 +17,28 @@
  */
 
 package org.apache.flink.api.scala.streaming
+import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.streaming.api.invokable.StreamInvokable
+import org.apache.flink.streaming.api.datastream.GroupedDataStream
+import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable
+import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable
+import org.apache.flink.streaming.api.datastream.GroupedDataStream
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.common.functions.FilterFunction
 
-class DataStream[OUT](javaStream: JavaStream[OUT]) {
+class DataStream[T](javaStream: JavaStream[T]) {
 
   /* This code is originally from the Apache Spark project. */
   /**
@@ -46,29 +59,233 @@ class DataStream[OUT](javaStream: JavaStream[OUT]) {
   }
 
   /**
+   * Gets the underlying java DataStream object.
+   */
+  private[flink] def getJavaStream: JavaStream[T] = javaStream
+
+  /**
+   * Sets the degree of parallelism of this operation. This must be greater than 1.
+   */
+  def setParallelism(dop: Int) = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot
have " +
+          "parallelism.")
+    }
+    this
+  }
+
+  /**
+   * Returns the degree of parallelism of this operation.
+   */
+  def getParallelism: Int = javaStream match {
+    case op: SingleOutputStreamOperator[_, _] => op.getParallelism
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + javaStream.toString + " does
not have " +
+        "parallelism.")
+  }
+
+  def merge(dataStreams: DataStream[T]*): DataStream[T] =
+    new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*))
+
+  def groupBy(fields: Int*): DataStream[T] =
+    new DataStream[T](javaStream.groupBy(fields: _*))
+
+  def groupBy(firstField: String, otherFields: String*): DataStream[T] =
+    new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*))
+
+  def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    new DataStream[T](javaStream.groupBy(keyExtractor))
+  }
+
+  def partitionBy(fields: Int*): DataStream[T] =
+    new DataStream[T](javaStream.partitionBy(fields: _*))
+
+  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
+    new DataStream[T](javaStream.partitionBy(firstField +: otherFields.toArray: _*))
+
+  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    new DataStream[T](javaStream.partitionBy(keyExtractor))
+  }
+
+  def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast())
+
+  def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle())
+
+  def forward: DataStream[T] = new DataStream[T](javaStream.forward())
+
+  def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
+
+  def max(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.max(field))
+    case field: String => return new DataStream[T](javaStream.max(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  def min(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.min(field))
+    case field: String => return new DataStream[T](javaStream.min(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  def sum(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.sum(field))
+    case field: String => return new DataStream[T](javaStream.sum(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  def maxBy(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.maxBy(field))
+    case field: String => return new DataStream[T](javaStream.maxBy(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  def minBy(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.minBy(field))
+    case field: String => return new DataStream[T](javaStream.minBy(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  def minBy(field: Any, first: Boolean): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.minBy(field, first))
+    case field: String => return new DataStream[T](javaStream.minBy(field, first))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
+    case field: String => return new DataStream[T](javaStream.maxBy(field, first))
+    case _ => throw new IllegalArgumentException("Aggregations are only supported by field
position (Int) or field expression (String)")
+  }
+
+  def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count())
+
+  /**
    * Creates a new DataStream by applying the given function to every element of this DataStream.
    */
-  def map[R: TypeInformation: ClassTag](fun: OUT => R): DataStream[R] = {
+  def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Map function must not be null.")
     }
-    val mapper = new MapFunction[OUT, R] {
+    val mapper = new MapFunction[T, R] {
       val cleanFun = clean(fun)
-      def map(in: OUT): R = cleanFun(in)
+      def map(in: T): R = cleanFun(in)
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT,
R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T,
R](mapper)))
   }
 
   /**
    * Creates a new DataStream by applying the given function to every element of this DataStream.
    */
-  def map[R: TypeInformation: ClassTag](mapper: MapFunction[OUT, R]): DataStream[R] = {
+  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = {
     if (mapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT,
R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T,
R](mapper)))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R]
= {
+    if (flatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]], new
FlatMapInvokable[T, R](flatMapper)))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R]
= {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R]
= {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative
reduce
+   * function.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    javaStream match {
+      case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce",
javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
+      case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(),
new StreamReduceInvokable[T](reducer)))
+    }
+  }
+
+  /**
+   * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative
reduce
+   * function.
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Creates a new DataSet that contains only the elements satisfying the given filter predicate.
+   */
+  def filter(filter: FilterFunction[T]): DataStream[T] = {
+    if (filter == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    new DataStream[T](javaStream.filter(filter))
+  }
+
+  def filter(fun: T => Boolean): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    val filter = new FilterFunction[T] {
+      val cleanFun = clean(fun)
+      def filter(in: T) = cleanFun(in)
+    }
+    this.filter(filter)
   }
 
   def print() = javaStream.print()


Mime
View raw message