flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [10/27] incubator-flink git commit: [scala] [streaming] Added SplitDataStream functionality
Date Sun, 04 Jan 2015 20:51:00 GMT
[scala] [streaming] Added SplitDataStream functionality

Conflicts:
	flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7aa68298
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7aa68298
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7aa68298

Branch: refs/heads/master
Commit: 7aa682982b980dd80955c243797d97f2c083ae7c
Parents: 80393c4
Author: mbalassi <mbalassi@apache.org>
Authored: Mon Dec 15 14:15:35 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/SplitDataStream.java         |  4 +-
 .../flink/api/scala/streaming/DataStream.scala  | 33 ++++++++++++-
 .../api/scala/streaming/SplitDataStream.scala   | 49 ++++++++++++++++++++
 3 files changed, 83 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7aa68298/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 5a8f038..4fac04c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -70,9 +70,9 @@ public class SplitDataStream<OUT> {
 		return returnStream;
 	}
 
-	private DataStream<OUT> selectOutput(String[] outputName) {
+	private DataStream<OUT> selectOutput(String[] outputNames) {
 		DataStream<OUT> returnStream = dataStream.copy();
-		returnStream.userDefinedNames = Arrays.asList(outputName);
+		returnStream.userDefinedNames = Arrays.asList(outputNames);
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7aa68298/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 69b8359..f0932d8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -41,7 +41,9 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy
+import org.apache.flink.streaming.api.collector.OutputSelector
 import scala.collection.JavaConversions._
+import java.util.HashMap
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -423,7 +425,36 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): WindowedDataStream[T]
= new WindowedDataStream[T](javaStream.window(triggers, evicters))
 
   /**
-   * >>>>>>> 12178aa... [scala] [streaming] Windowing functionality
added to scala api
+   *
+   * Operator used for directing tuples to specific named outputs using an
+   * OutputSelector. Calling this method on an operator creates a new
+   * SplitDataStream.
+   */
+  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match {
+    case op: SingleOutputStreamOperator[_, _] => new SplitDataStream[T](op.split(selector))
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not
be " +
+        "split.")
+  }
+
+  /**
+   * Creates a new SplitDataStream that contains only the elements satisfying the
+   *  given output selector predicate.
+   */
+  def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("OutputSelector must not be null.")
+    }
+    val selector = new OutputSelector[T] {
+      val cleanFun = clean(fun)
+      def select(in: T): java.lang.Iterable[String] = {
+        asJavaIterable(cleanFun(in).toIterable)
+      }
+    }
+    split(selector)
+  }
+
+  /**
    * Writes a DataStream to the standard output stream (stdout). For each
    * element of the DataStream the result of .toString is
    * written.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7aa68298/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
new file mode 100644
index 0000000..0b0cce5
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
+
+/**
+ * The SplitDataStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function.
+ *
+ * @param <OUT>
+ *            The type of the output.
+ */
+class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
+
+  /**
+   * Gets the underlying java DataStream object.
+   */
+  private[flink] def getJavaStream: SplitJavaStream[T] = javaStream
+
+  /**
+   *  Sets the output names for which the next operator will receive values.
+   */
+  def select(outputNames: String*): DataStream[T] =
+    new DataStream[T](javaStream.select(outputNames: _*))
+
+  /**
+   * Selects all output names from a split data stream.
+   */
+  def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
+
+}
\ No newline at end of file


Mime
View raw message